You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/11/02 10:45:05 UTC

[GitHub] [ignite] xtern commented on a change in pull request #9534: wip

xtern commented on a change in pull request #9534:
URL: https://github.com/apache/ignite/pull/9534#discussion_r740904230



##########
File path: modules/core/src/main/java/org/apache/ignite/cache/validation/IgnitePluggableSegmentationResolver.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.validation;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UNDEFINED;
+
+/** */
+public class IgnitePluggableSegmentationResolver implements PluggableSegmentationResolver {
+   /** */
+    public static final String ATTR_SEG_RESOLVER_CONFIGURED = "org.apache.ignite.segmentation.resolver.configured";
+
+    /** */
+    public static final String SEG_RESOLVER_ENABLED_PROP_NAME = "org.apache.ignite.segmentation.resolver.enabled";
+
+    /** */
+    private static final String SEG_RESOLVER_THREAD_PREFIX = "segmentation-resolver";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> segResolverEnabledProp = new SimpleDistributedProperty<>(
+        SEG_RESOLVER_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private final GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** */
+    private final IgniteThreadPoolExecutor stateChangeExec;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /**  */
+    private volatile State state = State.VALID;
+
+    /** @param ctx Ignite kernel context. */
+    public IgnitePluggableSegmentationResolver(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        stateChangeExec = new IgniteThreadPoolExecutor(
+            SEG_RESOLVER_THREAD_PREFIX,
+            ctx.igniteInstanceName(),
+            1,
+            1,
+            DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            UNDEFINED,
+            new OomExceptionHandler(ctx));
+
+        stateChangeExec.allowCoreThreadTimeOut(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isValidSegment() {
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    public void start() {
+        if (ctx.clientNode())
+            return;
+
+        ctx.addNodeAttribute(ATTR_SEG_RESOLVER_CONFIGURED, true);
+
+        ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            new ClusterStateChangedEventListener()
+        );
+
+        ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(segResolverEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    setDefaultValue(segResolverEnabledProp, U.isLocalNodeCoordinator(ctx.discovery()), log);
+                }
+            });
+    }
+
+    /** @return Discovery data. */
+    public Serializable provideDiscoveryData() {
+        return state;
+    }
+
+    /** @param data Discovery data. */
+    public void onDiscoveryDataReceived(Serializable data) {
+        state = (State)data;
+    }
+
+    /** @param node Node. */
+    public void validateNewNode(ClusterNode node) {
+        if (node.isClient())
+            return;
+
+        if (!TRUE.equals(node.attribute(ATTR_SEG_RESOLVER_CONFIGURED))) {
+            throw new IgniteException( "The Segmentation Resolver plugin is not configured for the server node that is" +
+                " trying to join the cluster. Since the Segmentation Resolver is only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']');
+        }
+
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv))
+                        throw new IgniteException("Node join request will be rejected due to concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + ']');
+                }
+            }
+        }
+    }
+
+    /** */
+    private boolean isDisabled() {
+        Boolean res = segResolverEnabledProp.get();
+
+        return res == null || !res;
+    }
+
+    /** @return return. */
+    private String formatTopologyNodes(Collection<ClusterNode> nodes) {

Review comment:
       I suggest moving this method inside the `TopologyChangedEventListener` class.

##########
File path: modules/core/src/main/java/org/apache/ignite/cache/validation/IgnitePluggableSegmentationResolver.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.validation;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UNDEFINED;
+
+/** */
+public class IgnitePluggableSegmentationResolver implements PluggableSegmentationResolver {
+   /** */
+    public static final String ATTR_SEG_RESOLVER_CONFIGURED = "org.apache.ignite.segmentation.resolver.configured";
+
+    /** */
+    public static final String SEG_RESOLVER_ENABLED_PROP_NAME = "org.apache.ignite.segmentation.resolver.enabled";
+
+    /** */
+    private static final String SEG_RESOLVER_THREAD_PREFIX = "segmentation-resolver";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> segResolverEnabledProp = new SimpleDistributedProperty<>(
+        SEG_RESOLVER_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private final GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** */
+    private final IgniteThreadPoolExecutor stateChangeExec;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /**  */
+    private volatile State state = State.VALID;
+
+    /** @param ctx Ignite kernel context. */
+    public IgnitePluggableSegmentationResolver(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        stateChangeExec = new IgniteThreadPoolExecutor(
+            SEG_RESOLVER_THREAD_PREFIX,
+            ctx.igniteInstanceName(),
+            1,
+            1,
+            DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            UNDEFINED,
+            new OomExceptionHandler(ctx));
+
+        stateChangeExec.allowCoreThreadTimeOut(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isValidSegment() {
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    public void start() {
+        if (ctx.clientNode())
+            return;
+
+        ctx.addNodeAttribute(ATTR_SEG_RESOLVER_CONFIGURED, true);
+
+        ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            new ClusterStateChangedEventListener()
+        );
+
+        ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(segResolverEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    setDefaultValue(segResolverEnabledProp, U.isLocalNodeCoordinator(ctx.discovery()), log);
+                }
+            });
+    }
+
+    /** @return Discovery data. */
+    public Serializable provideDiscoveryData() {
+        return state;
+    }
+
+    /** @param data Discovery data. */
+    public void onDiscoveryDataReceived(Serializable data) {
+        state = (State)data;
+    }
+
+    /** @param node Node. */
+    public void validateNewNode(ClusterNode node) {
+        if (node.isClient())
+            return;
+
+        if (!TRUE.equals(node.attribute(ATTR_SEG_RESOLVER_CONFIGURED))) {
+            throw new IgniteException( "The Segmentation Resolver plugin is not configured for the server node that is" +
+                " trying to join the cluster. Since the Segmentation Resolver is only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']');
+        }
+
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv))
+                        throw new IgniteException("Node join request will be rejected due to concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + ']');
+                }
+            }
+        }
+    }
+
+    /** */
+    private boolean isDisabled() {
+        Boolean res = segResolverEnabledProp.get();
+
+        return res == null || !res;
+    }
+
+    /** @return return. */
+    private String formatTopologyNodes(Collection<ClusterNode> nodes) {
+        return nodes.stream().map(n -> n.id().toString()).collect(Collectors.joining(", "));
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+            lastCheckedTopVer = evt.topologyVersion();
+
+            if (isDisabled())
+                return;
+
+            if (state == State.VALID && evt.type() == EVT_NODE_FAILED) {
+                List<? extends BaselineNode> baselineNodes = discoCache.baselineNodes();
+
+                if (baselineNodes != null && aliveBaselineNodes(baselineNodes) < baselineNodes.size() / 2 + 1) {
+                    state = State.INVALID;
+
+                    stateChangeExec.execute(() -> {
+                        try {
+                            ctx.cluster().get().state(ACTIVE_READ_ONLY);
+                        }
+                        catch (Throwable e) {
+                            U.error(
+                                log,
+                                "Failed to automatically switch state of the segmented cluster to the READ-ONLY mode" +
+                                    " [segmentedNodes=" + formatTopologyNodes(discoCache.allNodes()) + "]. Cache writes" +
+                                    " are already restricted for all configured caches, but this step is still required" +
+                                    " in order to be able to unlock cache writes in the future. Retry this operation" +
+                                    " manually, if possible.",
+                                e
+                            );
+                        }
+                    });
+
+                    U.warn(log, "Cluster segmentation was detected [segmentedNodes=" +
+                        formatTopologyNodes(discoCache.allNodes()) + ']');
+                }
+            }
+
+            if (ctx.state().isBaselineAutoAdjustEnabled())
+                U.warn(log, "Segmentation Resolver requires baseline to be configured. If no baseline is" +
+                    " set, any topology change is considered valid.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+
+        /** */
+        private int aliveBaselineNodes(Collection<? extends BaselineNode> baselineNodes) {
+            int res = 0;
+
+            for (BaselineNode node : baselineNodes) {
+                if (!(node instanceof DetachedClusterNode))
+                    ++res;
+            }
+
+            return res;
+        }
+    }
+
+    /** */
+    private class ClusterStateChangedEventListener implements CustomEventListener<ChangeGlobalStateFinishMessage> {

Review comment:
       From my point of view, it would be better to "inline" this class.

##########
File path: modules/core/src/main/java/org/apache/ignite/cache/validation/IgnitePluggableSegmentationResolver.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.validation;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UNDEFINED;
+
+/** */
+public class IgnitePluggableSegmentationResolver implements PluggableSegmentationResolver {
+   /** */
+    public static final String ATTR_SEG_RESOLVER_CONFIGURED = "org.apache.ignite.segmentation.resolver.configured";
+
+    /** */
+    public static final String SEG_RESOLVER_ENABLED_PROP_NAME = "org.apache.ignite.segmentation.resolver.enabled";
+
+    /** */
+    private static final String SEG_RESOLVER_THREAD_PREFIX = "segmentation-resolver";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> segResolverEnabledProp = new SimpleDistributedProperty<>(
+        SEG_RESOLVER_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private final GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** */
+    private final IgniteThreadPoolExecutor stateChangeExec;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /**  */
+    private volatile State state = State.VALID;
+
+    /** @param ctx Ignite kernel context. */
+    public IgnitePluggableSegmentationResolver(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        stateChangeExec = new IgniteThreadPoolExecutor(
+            SEG_RESOLVER_THREAD_PREFIX,
+            ctx.igniteInstanceName(),
+            1,
+            1,
+            DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            UNDEFINED,
+            new OomExceptionHandler(ctx));
+
+        stateChangeExec.allowCoreThreadTimeOut(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isValidSegment() {
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    public void start() {
+        if (ctx.clientNode())
+            return;
+
+        ctx.addNodeAttribute(ATTR_SEG_RESOLVER_CONFIGURED, true);
+
+        ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            new ClusterStateChangedEventListener()
+        );
+
+        ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(segResolverEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    setDefaultValue(segResolverEnabledProp, U.isLocalNodeCoordinator(ctx.discovery()), log);
+                }
+            });
+    }
+
+    /** @return Discovery data. */
+    public Serializable provideDiscoveryData() {
+        return state;
+    }
+
+    /** @param data Discovery data. */
+    public void onDiscoveryDataReceived(Serializable data) {
+        state = (State)data;
+    }
+
+    /** @param node Node. */
+    public void validateNewNode(ClusterNode node) {
+        if (node.isClient())
+            return;
+
+        if (!TRUE.equals(node.attribute(ATTR_SEG_RESOLVER_CONFIGURED))) {
+            throw new IgniteException( "The Segmentation Resolver plugin is not configured for the server node that is" +
+                " trying to join the cluster. Since the Segmentation Resolver is only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']');
+        }
+
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv))
+                        throw new IgniteException("Node join request will be rejected due to concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + ']');
+                }
+            }
+        }
+    }
+
+    /** */
+    private boolean isDisabled() {
+        Boolean res = segResolverEnabledProp.get();
+
+        return res == null || !res;
+    }
+
+    /** @return return. */
+    private String formatTopologyNodes(Collection<ClusterNode> nodes) {
+        return nodes.stream().map(n -> n.id().toString()).collect(Collectors.joining(", "));
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+            lastCheckedTopVer = evt.topologyVersion();
+
+            if (isDisabled())
+                return;
+
+            if (state == State.VALID && evt.type() == EVT_NODE_FAILED) {
+                List<? extends BaselineNode> baselineNodes = discoCache.baselineNodes();
+
+                if (baselineNodes != null && aliveBaselineNodes(baselineNodes) < baselineNodes.size() / 2 + 1) {
+                    state = State.INVALID;
+
+                    stateChangeExec.execute(() -> {
+                        try {
+                            ctx.cluster().get().state(ACTIVE_READ_ONLY);
+                        }
+                        catch (Throwable e) {
+                            U.error(
+                                log,
+                                "Failed to automatically switch state of the segmented cluster to the READ-ONLY mode" +
+                                    " [segmentedNodes=" + formatTopologyNodes(discoCache.allNodes()) + "]. Cache writes" +
+                                    " are already restricted for all configured caches, but this step is still required" +
+                                    " in order to be able to unlock cache writes in the future. Retry this operation" +
+                                    " manually, if possible.",
+                                e
+                            );
+                        }
+                    });
+
+                    U.warn(log, "Cluster segmentation was detected [segmentedNodes=" +
+                        formatTopologyNodes(discoCache.allNodes()) + ']');
+                }
+            }
+
+            if (ctx.state().isBaselineAutoAdjustEnabled())

Review comment:
       As I understand it, the problem with auto-adjustment is not that the baseline has not been set, but that it is being set before the resolver gets current baseline nodes.
   For example, `testConsequentSegmentationResolving` can be passed without disabling auto-adjustement if we set an auto-adjustement timeout `srv.cluster().BaselineAutoAdjustTimeout(1_000)`.
   
   So, I see 2 options here:
   1. get the previous baseline (before the last auto-adjustment that was caused by the top change, if possible) when getting the baseline nodes.
   2. correct the warning message that baseline auto-adjustment is enabled and the segmentation resolver may not work at all. This one seems strange because in in-memory cluster this is the default behavior.

##########
File path: modules/core/src/main/java/org/apache/ignite/cache/validation/IgnitePluggableSegmentationResolver.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.validation;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UNDEFINED;
+
+/** */
+public class IgnitePluggableSegmentationResolver implements PluggableSegmentationResolver {
+   /** */
+    public static final String ATTR_SEG_RESOLVER_CONFIGURED = "org.apache.ignite.segmentation.resolver.configured";
+
+    /** */
+    public static final String SEG_RESOLVER_ENABLED_PROP_NAME = "org.apache.ignite.segmentation.resolver.enabled";
+
+    /** */
+    private static final String SEG_RESOLVER_THREAD_PREFIX = "segmentation-resolver";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> segResolverEnabledProp = new SimpleDistributedProperty<>(
+        SEG_RESOLVER_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private final GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** */
+    private final IgniteThreadPoolExecutor stateChangeExec;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /**  */
+    private volatile State state = State.VALID;
+
+    /** @param ctx Ignite kernel context. */
+    public IgnitePluggableSegmentationResolver(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        stateChangeExec = new IgniteThreadPoolExecutor(
+            SEG_RESOLVER_THREAD_PREFIX,
+            ctx.igniteInstanceName(),
+            1,
+            1,
+            DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            UNDEFINED,
+            new OomExceptionHandler(ctx));
+
+        stateChangeExec.allowCoreThreadTimeOut(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isValidSegment() {
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    public void start() {
+        if (ctx.clientNode())
+            return;
+
+        ctx.addNodeAttribute(ATTR_SEG_RESOLVER_CONFIGURED, true);
+
+        ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            new ClusterStateChangedEventListener()
+        );
+
+        ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(segResolverEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    setDefaultValue(segResolverEnabledProp, U.isLocalNodeCoordinator(ctx.discovery()), log);
+                }
+            });
+    }
+
+    /** @return Discovery data. */
+    public Serializable provideDiscoveryData() {
+        return state;
+    }
+
+    /** @param data Discovery data. */
+    public void onDiscoveryDataReceived(Serializable data) {
+        state = (State)data;
+    }
+
+    /** @param node Node. */
+    public void validateNewNode(ClusterNode node) {
+        if (node.isClient())
+            return;
+
+        if (!TRUE.equals(node.attribute(ATTR_SEG_RESOLVER_CONFIGURED))) {
+            throw new IgniteException( "The Segmentation Resolver plugin is not configured for the server node that is" +
+                " trying to join the cluster. Since the Segmentation Resolver is only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']');
+        }
+
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv))
+                        throw new IgniteException("Node join request will be rejected due to concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + ']');
+                }
+            }
+        }
+    }
+
+    /** */
+    private boolean isDisabled() {
+        Boolean res = segResolverEnabledProp.get();
+
+        return res == null || !res;
+    }
+
+    /** @return return. */
+    private String formatTopologyNodes(Collection<ClusterNode> nodes) {
+        return nodes.stream().map(n -> n.id().toString()).collect(Collectors.joining(", "));
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+            lastCheckedTopVer = evt.topologyVersion();
+
+            if (isDisabled())
+                return;
+
+            if (state == State.VALID && evt.type() == EVT_NODE_FAILED) {

Review comment:
       When we change cluster state **to** read-only is treated as CLUSTER_WRITE_BLOCKED, but if we start the cluster in the read-only mode (`clusterStateOnStart` config property) it will be treated as "VALID" and this code block executes on segmentation.
   MB we should check the cluster state also.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org