You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2022/01/21 10:53:05 UTC
[ignite-extensions] branch master updated: IGNITE-16331 Add topology validator implementation extension. (#86)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new aa2a168 IGNITE-16331 Add topology validator implementation extension. (#86)
aa2a168 is described below
commit aa2a16864f7fa873ac86c0579b78f1225f77cb2d
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Fri Jan 21 13:52:57 2022 +0300
IGNITE-16331 Add topology validator implementation extension. (#86)
---
modules/topology-validator-ext/README.md | 68 +++
.../modules/core/src/test/config/log4j-test.xml | 40 ++
.../modules/core/src/test/config/tests.properties | 0
modules/topology-validator-ext/pom.xml | 67 +++
.../CacheTopologyValidatorPluginProvider.java | 372 +++++++++++++
.../cache/IgniteCacheTopologyValidatorTest.java | 616 +++++++++++++++++++++
.../IgniteCacheTopologyValidatorTestSuite.java | 27 +
pom.xml | 1 +
8 files changed, 1191 insertions(+)
diff --git a/modules/topology-validator-ext/README.md b/modules/topology-validator-ext/README.md
new file mode 100644
index 0000000..3e61055
--- /dev/null
+++ b/modules/topology-validator-ext/README.md
@@ -0,0 +1,68 @@
+#What problem this module is intended to solve?
+
+Some network issues can cause the Ignite cluster to split into several isolated parts - segments. Nodes from different
+segments cannot communicate with each other, while nodes from the same segment do not experience communication problems.
+In this case, each segment marks the nodes with which the connection was lost as failed and considers itself as an
+independent Ignite cluster. Let's call this scenario cluster segmentation.
+
+Cluster segmentation can lead to cache data inconsistency across different segments because each segment can continue
+to handle cache update requests independently.
+
+Apache Ignite allows the user to provide custom validation logic for Ignite caches that will be applied to
+each topology change, and if the validation fails, writes to the corresponding cache will be blocked. The mentioned
+validation logic is passed to Ignite as an TopologyValidation interface implementation. It can be done through cache
+configuration or through Ignite plugin extensions mechanism (see CacheTopologyValidatorProvider interface).
+
+This module represents an implementation of the Ignite plugin that provides the guarantee that
+after cluster segmentation, no more than one segment can process write requests to all caches. This is achieved by
+providing implementation of the TopologyValidation interface as mentioned above.
+
+The current implementation of TopologyValidation uses remaining Ignite baseline nodes in the topology to determine
+segmentation.
+
+#In what cases cache writes will be blocked for the segment?
+
+The following rules are used to determine which segment can process cache write requests after segmentation and which
+cannot:
+
+1. The segment is allowed to process cache writes requests after segmentation if and only if more than configured
+fraction of the baseline nodes remain in the segment, otherwise all writes to the cache will be blocked.
+2. If the cluster is split into two equal segments, writing to both of them will be blocked.
+3. Since Ignite treats segmentation as sequential node failures, even a single node failure in an active cluster in
+which alive baseline nodes count is less or equals to segmentation threshold is considered as segmentation and results
+in write block for all caches.
+
+#Configuration
+
+1. Configure CacheTopologyValidatorPluginProvider on each server node:
+
+ ```
+ new IgniteConfiguration()
+ ...
+ .setPluginProviders(new CacheTopologyValidatorPluginProvider());
+ ```
+
+2. Configure baseline nodes explicitly, or configure baseline nodes auto adjustment with a timeout that significantly
+exceeds the node failure detection timeout. It can be done through Java Api or through control script.
+See [1] and [2] for more info.
+
+Note that it is illegal to use baseline nodes auto adjustment with a zero timeout along with current
+TopologyValidator implementation.
+
+3. Configure deactivation threshold.
+The deactivation threshold is a fraction of nodes that determines how many nodes must remain in the baseline topology in
+order to this segment was considered valid and continued to accept write requests.
+This value must be in range from 0.5 (inclusively) to 1. Default value is 0.5. If the default value suits you, nothing
+to do is required.
+
+To set up custom deactivation threshold value set the `org.apache.ignite.topology.validator.deactivation.threshold`
+distributed configuration property via control script (see https://ignite.apache.org/docs/latest/tools/control-script#working-with-cluster-properties)
+
+#Manual segmentation resolving
+
+The state of each segment for which cache writes were blocked will be eventually switched to the READ-ONLY mode.
+Manually switching the cluster state back to ACTIVE mode will restore cache write availability. It can be done through
+Java Api or through control script. See [1] and [2] for more info.
+
+[1] - https://ignite.apache.org/docs/latest/clustering/baseline-topology \
+[2] - https://ignite.apache.org/docs/latest/tools/control-script#activation-deactivation-and-topology-management
\ No newline at end of file
diff --git a/modules/topology-validator-ext/modules/core/src/test/config/log4j-test.xml b/modules/topology-validator-ext/modules/core/src/test/config/log4j-test.xml
new file mode 100644
index 0000000..323c348
--- /dev/null
+++ b/modules/topology-validator-ext/modules/core/src/test/config/log4j-test.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="DEBUG"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+
+ <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="levelMin" value="DEBUG"/>
+ <param name="levelMax" value="INFO"/>
+ </filter>
+ </appender>
+
+ <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.err"/>
+
+ <param name="Threshold" value="WARN"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org">
+ <level value="INFO"/>
+ </category>
+
+ <root>
+ <level value="INFO"/>
+
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="CONSOLE_ERR"/>
+ </root>
+</log4j:configuration>
diff --git a/modules/topology-validator-ext/modules/core/src/test/config/tests.properties b/modules/topology-validator-ext/modules/core/src/test/config/tests.properties
new file mode 100644
index 0000000..e69de29
diff --git a/modules/topology-validator-ext/pom.xml b/modules/topology-validator-ext/pom.xml
new file mode 100644
index 0000000..05fa3b2
--- /dev/null
+++ b/modules/topology-validator-ext/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-extensions-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-topology-validator-ext</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${ignite.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${ignite.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${ignite.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${ignite.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java b/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
new file mode 100644
index 0000000..b3c8d48
--- /dev/null
+++ b/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator {
+ /**
+ * Decimal fraction of nodes that determines how many nodes must remain in the baseline topology in order to this
+ * segment was considered valid and continued to accept write requests after segmentation. This value must be in
+ * range from 0.5 (inclusively) to 1 or 0 if validation should be disabled.
+ *
+ * @see #DFLT_DEACTIVATION_THRESHOLD that is used as a default value.
+ */
+ public static final String TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME =
+ "org.apache.ignite.topology.validator.deactivation.threshold";
+
+ /** */
+ public static final float DFLT_DEACTIVATION_THRESHOLD = 0.5F;
+
+ /** */
+ private static final int[] TOP_CHANGED_EVTS = new int[] {
+ EVT_NODE_LEFT,
+ EVT_NODE_JOINED,
+ EVT_NODE_FAILED
+ };
+
+ /** */
+ private final SimpleDistributedProperty<Float> deactivateThresholdProp = new SimpleDistributedProperty<>(
+ TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME,
+ str -> {
+ float res = Float.parseFloat(str);
+
+ if ((res < 0.5F || res >= 1F) && res != 0F) {
+ throw new IgniteException("Topology validator cluster deactivation threshold must be a decimal" +
+ " fraction in the range from 0.5 (inclusively) to 1 or 0 if validation should be disabled.");
+ }
+
+ return res;
+ }
+ );
+
+ /** Ignite kernel context. */
+ private GridKernalContext ctx;
+
+ /** Ignite logger. */
+ private IgniteLogger log;
+
+ /** */
+ private long lastCheckedTopVer;
+
+ /**
+ * {@code null} value means that segmentation happened, cache writes were blocked and cluster is in process of
+ * switching its state to READ-ONLY mode.
+ */
+ private volatile ClusterState state;
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "Topology Validator";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return "1.0.0";
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends IgnitePlugin> T plugin() {
+ return (T) new IgnitePlugin() {
+ // No-op.
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public String copyright() {
+ return "";
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) {
+ ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+ if (!ctx.clientNode()) {
+ registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() {
+ /** {@inheritDoc} */
+ @Override public TopologyValidator topologyValidator(String cacheName) {
+ return CacheTopologyValidatorPluginProvider.this;
+ }
+ });
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(PluginContext pluginCtx) {
+ if (ctx.clientNode())
+ return;
+
+ log = ctx.log(getClass());
+
+ state = ctx.state().clusterState().state();
+
+ ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+ ctx.discovery().setCustomEventListener(
+ ChangeGlobalStateFinishMessage.class,
+ (topVer, snd, msg) -> state = msg.state()
+ );
+
+ ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) ->
+ validateBaselineConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal));
+
+ ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) ->
+ validateBaselineConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout())
+ );
+
+ ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+ new DistributedConfigurationLifecycleListener() {
+ /** {@inheritDoc} */
+ @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+ dispatcher.registerProperty(deactivateThresholdProp);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyToWrite() {
+ float deactivationThreshold = U.isLocalNodeCoordinator(ctx.discovery())
+ ? DFLT_DEACTIVATION_THRESHOLD
+ : deactivateThresholdProp.getOrDefault(0F);
+
+ if (deactivationThreshold == 0F) {
+ U.warn(log, "Topology Validator will be disabled because it is not configured for the" +
+ " cluster the current node joined. Make sure the Topology Validator plugin is" +
+ " configured on all cluster nodes.");
+ }
+
+ setDefaultValue(deactivateThresholdProp, deactivationThreshold, log);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+ return state;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+ if (ctx.localNodeId().equals(nodeId))
+ state = (ClusterState)data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException {
+ if (node.isClient())
+ return;
+
+ if (data == null) {
+ String msg = "The Topology Validator plugin is not configured for the server node that is" +
+ " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" +
+ " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']';
+
+ throw new PluginValidationException(msg, msg, node.id());
+ }
+
+ // If the new node is joining but some node failed/left events has not been handled by
+ // {@link TopologyChangedEventListener} yet, we cannot guarantee that the {@code state} on the joining node will
+ // be consistent with one that on the cluster nodes.
+ if (state == ACTIVE) {
+ DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+ if (discoCache != null) {
+ for (ClusterNode srv : discoCache.serverNodes()) {
+ if (!ctx.discovery().alive(srv)) {
+ String msg = "Node join request was rejected due to concurrent node left" +
+ " process handling [rejectedNodeId=" + node.id() + ']';
+
+ throw new PluginValidationException(msg, msg, node.id());
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean validate(Collection<ClusterNode> nodes) {
+ return isDisabled() || state != null;
+ }
+
+ /** */
+ private boolean isDisabled() {
+ return deactivateThresholdProp.getOrDefault(0F) == 0F;
+ }
+
+ /** */
+ private boolean isValidTopology(Collection<? extends BaselineNode> baselineNodes) {
+ int aliveBaselineNodes = F.size(baselineNodes, n -> !(n instanceof DetachedClusterNode));
+
+ if (aliveBaselineNodes == 0)
+ return true;
+
+ float threshold = deactivateThresholdProp.getOrDefault(DFLT_DEACTIVATION_THRESHOLD);
+
+ assert threshold >= 0.5F && threshold < 1;
+
+ // Actually Ignite considers segmentation as the sequential node failures. So we detect segmentation
+ // even if the single node fails and less than half of baseline nodes are alive.
+ return aliveBaselineNodes >= ((int)(baselineNodes.size() * threshold)) + 1;
+ }
+
+ /** */
+ private void validateBaselineConfiguration(Boolean enabled, Long autoAdjustmentTimeout) {
+ if (isDisabled() || enabled == null || autoAdjustmentTimeout == null)
+ return;
+
+ if (!isBaselineConfigurationCompatible(enabled, autoAdjustmentTimeout)) {
+ LT.warn(log, "Topology Validator is currently skipping validation of topology changes because" +
+ " Baseline Auto Adjustment with zero timeout is configured for the cluster. Configure Baseline Nodes" +
+ " explicitly or set Baseline Auto Adjustment Timeout to greater than zero.");
+ }
+ }
+
+ /**
+ * Current implementation of segmentation detection compares node of each topology with configured baseline nodes.
+ * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change
+ * and the comparison described above makes no sense.
+ */
+ private boolean isBaselineConfigurationCompatible(boolean enabled, long autoAdjustmentTimeout) {
+ return !(enabled && autoAdjustmentTimeout == 0L);
+ }
+
+ /** */
+ private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener {
+ /** {@inheritDoc} */
+ @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+ ClusterState locStateCopy = state;
+
+ lastCheckedTopVer = evt.topologyVersion();
+
+ boolean isTopValidationApplicable =
+ !isDisabled() &&
+ state == ACTIVE &&
+ evt.type() == EVT_NODE_FAILED &&
+ isBaselineConfigurationCompatible(
+ ctx.state().isBaselineAutoAdjustEnabled(),
+ ctx.state().baselineAutoAdjustTimeout()
+ );
+
+ if (isTopValidationApplicable && !isValidTopology(discoCache.baselineNodes())) {
+ locStateCopy = null;
+
+ try {
+ ctx.closure().runLocal(new GridPlainRunnable() {
+ @Override public void run() {
+ try {
+ ctx.cluster().get().state(ACTIVE_READ_ONLY);
+ }
+ catch (Throwable e) {
+ U.error(log,
+ "Failed to automatically switch state of the segmented cluster to the READ-ONLY" +
+ " mode. Cache writes were already restricted for all configured caches, but this" +
+ " step is still required in order to be able to unlock cache writes in the future." +
+ " Retry this operation manually, if possible [segmentedNodes=" +
+ F.viewReadOnly(discoCache.allNodes(), F.node2id()) + "]", e);
+ }
+ }
+ }, PUBLIC_POOL);
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to schedule cluster state change to the READ-ONLY mode.", e);
+ }
+
+ U.warn(log, "Cluster segmentation was detected. Write to all user caches were blocked" +
+ " [segmentedNodes=" + F.viewReadOnly(discoCache.allNodes(), F.node2id()) + ']');
+ }
+
+ state = locStateCopy;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int order() {
+ return 0;
+ }
+ }
+}
diff --git a/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTest.java b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTest.java
new file mode 100644
index 0000000..d8a3f7b
--- /dev/null
+++ b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTest.java
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.cache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils.RunnableX;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
+import static org.apache.ignite.plugin.cache.CacheTopologyValidatorPluginProvider.TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+@RunWith(Parameterized.class)
+public class IgniteCacheTopologyValidatorTest extends IgniteCacheTopologySplitAbstractTest {
+ /** */
+ private static final String LOCAL_HOST = "localhost";
+
+ /** */
+ private static final int CACHE_KEY_CNT = 1000;
+
+ /** */
+ public static final int CACHE_CNT = 2;
+
+ /** */
+ @Parameterized.Parameter
+ public boolean isPersistenceEnabled;
+
+ /** */
+ @Parameterized.Parameters(name = "isPersistenceEnabled={0}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[]{true}, new Object[]{false});
+ }
+
+ /** */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return getConfiguration(igniteInstanceName, true);
+ }
+
+ /** */
+ private IgniteConfiguration getConfiguration(
+ String igniteInstanceName,
+ boolean configureSegmentationResolverPlugin
+ ) throws Exception {
+ int idx = getTestIgniteInstanceIndex(igniteInstanceName);
+
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+ .setUserAttributes(singletonMap(IDX_ATTR, idx));
+
+ if (configureSegmentationResolverPlugin)
+ cfg.setPluginProviders(new CacheTopologyValidatorPluginProvider());
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi())
+ .setIpFinder(sharedStaticIpFinder)
+ .setLocalPortRange(1)
+ .setLocalPort(discoPort(idx))
+ .setConnectionRecoveryTimeout(0);
+
+ if (isPersistenceEnabled) {
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setInitialSize(100L * 1024 * 1024)
+ .setMaxSize(200L * 1024 * 1024)
+ .setPersistenceEnabled(true)));
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isBlocked(int locPort, int rmtPort) {
+ return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort) != segment(rmtPort);
+ }
+
+ /** */
+ private int segment(int discoPort) {
+ return (discoPort - DFLT_PORT) % 2 == 0 ? 0 : 1;
+ }
+
+ /** */
+ @Override public int segment(ClusterNode node) {
+ return node.<Integer>attribute(IDX_ATTR) % 2 == 0 ? 0 : 1;
+ }
+
+ /** */
+ @Test
+ public void testConnectionToIncompatibleCluster() throws Exception {
+ startGrid(getConfiguration(getTestIgniteInstanceName(0), false));
+
+ startGrid(1);
+
+ if (isPersistenceEnabled)
+ grid(0).cluster().state(ACTIVE);
+ else
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ assertTrue(waitForCondition(
+ () -> 0F == (Float)grid(1).context()
+ .distributedConfiguration()
+ .property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+ .get(),
+ getTestTimeout()
+ ));
+
+ splitAndWait();
+
+ connectNodeToSegment(3, false, 1);
+
+ assertTrue(waitForCondition(
+ () -> 0F == (Float)grid(3).context()
+ .distributedConfiguration()
+ .property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+ .get(),
+ getTestTimeout()
+ ));
+ }
+
+ /** */
+ @Test
+ public void testIncompatibleNodeConnection() throws Exception {
+ prepareCluster(1);
+
+ assertThrowsAnyCause(
+ log,
+ () -> startGrid(getConfiguration(getTestIgniteInstanceName(1), false)),
+ IgniteSpiException.class,
+ "The Topology Validator plugin is not configured for the server node that is trying to join the cluster."
+ );
+
+ startClientGrid(getConfiguration(getTestIgniteInstanceName(2), false));
+
+ assertEquals(2, grid(0).cluster().nodes().size());
+
+ checkPutGet(G.allGrids(), true);
+ }
+
+ /** */
+ @Test
+ public void testConnectionToSegmentedCluster() throws Exception {
+ prepareCluster(6);
+
+ stopGrid(4);
+ stopGrid(5);
+
+ splitAndWait();
+
+ checkPutGet(G.allGrids(), false);
+
+ connectNodeToSegment(4, false, 0);
+ connectNodeToSegment(6, true, 0);
+
+ checkPutGet(0, false);
+
+ connectNodeToSegment(5, false, 1);
+ connectNodeToSegment(7, true, 1);
+
+ checkPutGet(1, false);
+
+ stopSegmentNodes(1);
+
+ unsplit();
+
+ startGrid(1);
+
+ checkPutGet(G.allGrids(), false);
+ }
+
+ /** */
+ @Test
+ public void testRegularNodeStartStop() throws Exception {
+ prepareCluster(1);
+
+ checkPutGetAfter(() -> startGrid(1));
+ checkPutGetAfter(() -> stopGrid(1));
+
+ checkPutGetAfter(() -> startClientGrid(2));
+ checkPutGetAfter(() -> stopGrid(2));
+
+ startGrid(1);
+
+ grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion());
+
+ checkPutGetAfter(() -> startGrid(3));
+ checkPutGetAfter(() -> stopGrid(3));
+
+ checkPutGetAfter(() -> stopGrid(1));
+
+ checkPutGetAfter(() -> startClientGrid(2));
+ checkPutGetAfter(() -> stopGrid(2));
+ }
+
+ /** */
+ @Test
+ public void testClientNodeSegmentationIgnored() throws Exception {
+ prepareCluster(1);
+
+ startClientGrid(1);
+
+ failNode(1, Collections.singleton(grid(0)));
+
+ checkPutGet(Collections.singleton(grid(0)), true);
+ }
+
+ /** */
+ @Test
+ public void testSplitWithoutBaseline() throws Exception {
+ Assume.assumeFalse(isPersistenceEnabled);
+
+ startGridsMultiThreaded(4);
+
+ createCaches();
+
+ splitAndWait();
+
+ checkPutGet(G.allGrids(), true);
+ }
+
+ /** */
+ @Test
+ public void testSplitWithBaseline() throws Exception {
+ prepareCluster(3);
+
+ startGrid(3);
+
+ splitAndWait();
+
+ connectNodeToSegment(4, true, 0);
+ connectNodeToSegment(5, true, 1);
+
+ checkPutGet(0, true);
+ checkPutGet(1, false);
+
+ assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(1).cluster().state(), getTestTimeout()));
+
+ stopSegmentNodes(1);
+ stopGrid(4);
+
+ unsplit();
+
+ startGrid(1);
+ startGrid(3);
+
+ grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion());
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPutGet(G.allGrids(), true);
+
+ splitAndWait();
+
+ checkPutGet(G.allGrids(), false);
+
+ assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(1).cluster().state(), getTestTimeout()));
+ assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(0).cluster().state(), getTestTimeout()));
+
+ grid(0).cluster().state(ACTIVE);
+
+ checkPutGet(0, true);
+ checkPutGet(1, false);
+ }
+
+ /** */
+ @Test
+ public void testConsequentSegmentationResolving() throws Exception {
+ prepareCluster(4);
+
+ splitAndWait();
+
+ checkPutGet(G.allGrids(), false);
+
+ grid(1).cluster().state(ACTIVE);
+
+ checkPutGet(0, false);
+ checkPutGet(1, true);
+
+ stopSegmentNodes(0);
+
+ unsplit();
+
+ failNode(1, Collections.singleton(grid(3)));
+
+ checkPutGet(Collections.singleton(grid(3)), false);
+
+ grid(3).cluster().state(ACTIVE);
+
+ checkPutGet(Collections.singleton(grid(3)), true);
+ }
+
+ /** */
+ @Test
+ public void testDeactivationThreshold() throws Exception {
+ prepareCluster(5);
+
+ grid(0).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+ .propagate(0.8F);
+
+ Collection<Ignite> segment = new ArrayList<>(G.allGrids());
+
+ segment.remove(grid(0));
+
+ failNode(0, segment);
+
+ checkPutGet(segment, false);
+
+ stopGrid(0);
+
+ grid(1).cluster().state(ACTIVE);
+
+ grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+ .propagate(0.5F);
+
+ segment.remove(grid(4));
+
+ failNode(4, segment);
+
+ checkPutGet(segment, true);
+ }
+
+ /** */
+ @Test
+ public void testValidationDisabled() throws Exception {
+ prepareCluster(4);
+
+ grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+ .propagate(0F);
+
+ splitAndWait();
+
+ connectNodeToSegment(4, true, 0);
+ connectNodeToSegment(5, true, 1);
+
+ checkPutGet(G.allGrids(), true);
+
+ stopSegmentNodes(0);
+
+ unsplit();
+
+ grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+ .propagate(0.5F);
+
+ failNode(1, Collections.singleton(grid(3)));
+
+ checkPutGet(Collections.singleton(grid(3)), false);
+ }
+
+ /** */
+ @Test
+ public void testNodeJoinWithHalfBaselineNodesLeft() throws Exception {
+ prepareCluster(4);
+
+ stopGrid(0);
+ stopGrid(1);
+ stopGrid(2);
+
+ checkPutGet(G.allGrids(), true);
+
+ startGrid(0);
+
+ checkPutGet(G.allGrids(), true);
+ }
+
+ /** */
+ @Test
+ public void testNodeJoinConcurrentWithLeftRejected() throws Exception {
+ prepareCluster(2);
+
+ CountDownLatch discoveryWorkerBlockedLatch = new CountDownLatch(1);
+
+ try {
+ grid(0).events().localListen(evt -> {
+ try {
+ discoveryWorkerBlockedLatch.await();
+ }
+ catch (InterruptedException e) {
+ U.error(log, e);
+
+ Thread.currentThread().interrupt();
+ }
+
+ return true;
+ }, EVT_NODE_JOINED);
+
+ startGrid(2);
+
+ stopGrid(1);
+
+ assertThrowsAnyCause(
+ log,
+ () -> startGrid(1),
+ IgniteSpiException.class,
+ "Node join request was rejected due to concurrent node left process handling"
+ );
+ }
+ finally {
+ discoveryWorkerBlockedLatch.countDown();
+ }
+ }
+
+ /** */
+ @Test
+ public void testPreconfiguredClusterState() throws Exception {
+ Assume.assumeFalse(isPersistenceEnabled);
+
+ startGrid(0);
+
+ startGrid(getConfiguration(getTestIgniteInstanceName(1)).setClusterStateOnStart(ACTIVE_READ_ONLY));
+
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ createCaches();
+
+ splitAndWait();
+
+ checkPutGet(G.allGrids(), false);
+ }
+
+ /** */
+ private IgniteEx connectNodeToSegment(int nodeIdx, boolean isClient, int segment) throws Exception {
+ IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(nodeIdx));
+
+ List<String> segmentDiscoPorts = segmentNodes(segment, false).stream()
+ .map(node -> LOCAL_HOST + ':' + discoPort(node.localNode().<Integer>attribute(IDX_ATTR)))
+ .collect(toList());
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(segmentDiscoPorts));
+
+ cfg.setClientMode(isClient);
+
+ return startGrid(cfg);
+ }
+
+ /** */
+ private boolean isDiscoPort(int port) {
+ return port >= DFLT_PORT &&
+ port <= (DFLT_PORT + DFLT_PORT_RANGE);
+ }
+
+ /** */
+ public void createCaches() {
+ for (int cacheIdx = 0; cacheIdx < CACHE_CNT; cacheIdx++) {
+ grid(0).createCache(new CacheConfiguration<>()
+ .setName(cacheName(cacheIdx))
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setCacheMode(REPLICATED)
+ .setReadFromBackup(false));
+ }
+
+ checkPutGet(G.allGrids(), true);
+ }
+
+ /** */
+ private void checkPutGetAfter(RunnableX r) {
+ r.run();
+
+ checkPutGet(G.allGrids(), true);
+ }
+
+ /** */
+ private void checkPutGet(int segment, boolean successfulPutExpected) {
+ checkPutGet(segmentNodes(segment, true), successfulPutExpected);
+ }
+
+ /** */
+ private String cacheName(int cacheIdx) {
+ return DEFAULT_CACHE_NAME + '_' + cacheIdx;
+ }
+
+ /** */
+ private int discoPort(int idx) {
+ return DFLT_PORT + idx;
+ }
+
+ /** */
+ private void checkPutGet(Collection<? extends Ignite> nodes, boolean successfulPutExpected) {
+ assertFalse(nodes.isEmpty());
+
+ for (Ignite node : nodes) {
+ for (int cacheIdx = 0; cacheIdx < CACHE_CNT; cacheIdx++) {
+ IgniteCache<Object, Object> cache = node.cache(cacheName(cacheIdx));
+
+ for (int i = 0; i < CACHE_KEY_CNT; i++) {
+ int key = i;
+
+ if (!successfulPutExpected) {
+ assertThrowsAnyCause(
+ null,
+ () -> {
+ cache.put(key, key);
+
+ return null;
+ },
+ CacheInvalidStateException.class,
+ "Failed to perform cache operation");
+ }
+ else
+ cache.put(key, key);
+
+ assertEquals(key, cache.get(key));
+ }
+ }
+ }
+ }
+
+ /** */
+ private void stopSegmentNodes(int segment) throws Exception {
+ for (IgniteEx node : segmentNodes(segment, true)) {
+ if (isPersistenceEnabled) {
+ String pdsFolder = node.context().pdsFolderResolver().resolveFolders().folderName();
+
+ stopGrid(node.name());
+
+ cleanPersistenceDir(pdsFolder);
+ }
+ else
+ stopGrid(node.name());
+ }
+ }
+
+ /** */
+ private void failNode(int idx, Collection<Ignite> awaitingNodes) {
+ assertFalse(awaitingNodes.isEmpty());
+
+ long topVer = awaitingNodes.iterator().next().cluster().topologyVersion();
+
+ TcpDiscoverySpi spi = (TcpDiscoverySpi)grid(idx).context().discovery().getInjectedDiscoverySpi();
+
+ spi.setClientReconnectDisabled(true);
+ spi.disconnect();
+
+ awaitExchangeVersionFinished(awaitingNodes, topVer + 1);
+ }
+
+ /** */
+ private Collection<IgniteEx> segmentNodes(int segment, boolean includeClients) {
+ return G.allGrids().stream()
+ .filter(ignite -> includeClients || !ignite.cluster().localNode().isClient())
+ .filter(ignite -> segment(ignite.cluster().localNode()) == segment)
+ .map(ignite -> (IgniteEx)ignite)
+ .collect(Collectors.toList());
+ }
+
+ /** */
+ private void prepareCluster(int nodes) throws Exception {
+ startGridsMultiThreaded(nodes);
+
+ if (isPersistenceEnabled)
+ grid(0).cluster().state(ACTIVE);
+ else
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ createCaches();
+ }
+}
diff --git a/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTestSuite.java b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTestSuite.java
new file mode 100644
index 0000000..322da1f
--- /dev/null
+++ b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTestSuite.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.cache;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/** Ignite topology validator test suite. */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({IgniteCacheTopologyValidatorTest.class})
+public class IgniteCacheTopologyValidatorTestSuite {
+}
diff --git a/pom.xml b/pom.xml
index 9aebb25..e99c1fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
<module>modules/azure-ext</module>
<module>modules/gce-ext</module>
<module>modules/zookeeper-ip-finder-ext</module>
+ <module>modules/topology-validator-ext</module>
</modules>
<profiles>