You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/01/27 07:08:20 UTC

[ignite] branch master updated: IGNITE-12470 Pme-free switch feature should be deactivatable (#7304)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e6b97e  IGNITE-12470 Pme-free switch feature should be deactivatable (#7304)
0e6b97e is described below

commit 0e6b97edc1d6a1faba5efa10d69e6eb69c78ae24
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon Jan 27 10:07:57 2020 +0300

    IGNITE-12470 Pme-free switch feature should be deactivatable (#7304)
    
    Signed-off-by: Anton Vinogradov <av...@apache.org>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   3 +
 .../org/apache/ignite/internal/IgniteFeatures.java |   5 +
 .../internal/processors/cache/ExchangeContext.java |  17 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java |   2 +-
 .../distributed/GridExchangeFreeSwitchTest.java    | 208 +++++++++++++++------
 5 files changed, 180 insertions(+), 55 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index b474eb9..22b998e 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -207,6 +207,9 @@ public final class IgniteSystemProperties {
     /** */
     public static final String IGNITE_EXCHANGE_MERGE_DELAY = "IGNITE_EXCHANGE_MERGE_DELAY";
 
+    /** PME-free switch explicitly disabled. */
+    public static final String IGNITE_PME_FREE_SWITCH_DISABLED = "IGNITE_PME_FREE_SWITCH_DISABLED";
+
     /**
      * Name of the system property defining name of command line program.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 956d72f..fa6bd0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PME_FREE_SWITCH_DISABLED;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
 
 /**
@@ -166,6 +168,9 @@ public enum IgniteFeatures {
         final BitSet set = new BitSet();
 
         for (IgniteFeatures value : IgniteFeatures.values()) {
+            if (value == PME_FREE_SWITCH && getBoolean(IGNITE_PME_FREE_SWITCH_DISABLED))
+                continue;
+
             final int featureId = value.getFeatureId();
 
             assert !set.get(featureId) : "Duplicate feature ID found for [" + value + "] having same ID ["
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index e101f1a..f9e9376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -38,6 +39,9 @@ public class ExchangeContext {
     /** */
     public static final String IGNITE_EXCHANGE_COMPATIBILITY_VER_1 = "IGNITE_EXCHANGE_COMPATIBILITY_VER_1";
 
+    /** Logger. */
+    private final IgniteLogger log;
+
     /** Cache groups to request affinity for during local join exchange. */
     private Set<Integer> requestGrpsAffOnJoin;
 
@@ -57,16 +61,25 @@ public class ExchangeContext {
     private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
 
     /**
+     * @param cctx Context.
      * @param crd Coordinator flag.
      * @param fut Exchange future.
      */
-    public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
+    public ExchangeContext(GridCacheSharedContext<?, ?> cctx, boolean crd, GridDhtPartitionsExchangeFuture fut) {
+        log = cctx.logger(getClass());
+
         int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
 
+        boolean allNodesSupportsPmeFreeSwitch = allNodesSupports(fut.firstEventCache().allNodes(), PME_FREE_SWITCH);
+
+        if (!allNodesSupportsPmeFreeSwitch)
+            log.warning("Current topology does not support the PME-free switch. Please check all nodes support" +
+                " this feature and it was not explicitly disabled by IGNITE_PME_FREE_SWITCH_DISABLED JVM option.");
+
         if (!compatibilityNode &&
             fut.wasRebalanced() &&
             fut.isBaselineNodeFailed() &&
-            allNodesSupports(fut.firstEventCache().allNodes(), PME_FREE_SWITCH)) {
+            allNodesSupportsPmeFreeSwitch) {
             exchangeFreeSwitch = true;
             merge = false;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 890e28b..40055f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -771,7 +771,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
-            exchCtx = new ExchangeContext(crdNode, this);
+            exchCtx = new ExchangeContext(cctx, crdNode, this);
 
             cctx.exchange().exchangerBlockingSectionBegin();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
index 7ca5fff..cbc2a49 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -47,7 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -55,7 +56,11 @@ import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PME_FREE_SWITCH_DISABLED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteFeatures.PME_FREE_SWITCH;
+import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
 
 /**
  *
@@ -81,16 +86,13 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
         cfg.setCacheConfiguration(cacheC != null ?
             cacheC.apply(igniteInstanceName) : new CacheConfiguration[] {cacheConfiguration()});
 
+        cfg.setClusterStateOnStart(ClusterState.INACTIVE);
+
         DataStorageConfiguration dsCfg = new DataStorageConfiguration();
 
         DataRegionConfiguration drCfg = new DataRegionConfiguration();
 
-        if (persistence) {
-            drCfg.setPersistenceEnabled(true);
-
-            cfg.setActiveOnStart(false);
-            cfg.setAutoActivationEnabled(false);
-        }
+        drCfg.setPersistenceEnabled(persistence);
 
         dsCfg.setDefaultDataRegionConfiguration(drCfg);
 
@@ -135,24 +137,54 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Checks Partition Exchange happen in case of baseline auto-adjust (in-memory cluster). It's not possible to
-     * perform switch since primaries may change.
+     * Checks PME happen in case of baseline auto-adjust (in-memory cluster). It's not possible to perform switch since
+     * primaries may change.
      */
     @Test
     public void testNonBaselineNodeLeftOnFullyRebalancedCluster() throws Exception {
-        testNodeLeftOnFullyRebalancedCluster();
+        testNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.NONE);
     }
 
     /**
-     * Checks Partition Exchange is absent in case of fixed baseline. It's possible to perform switch since primaries
-     * can't change.
+     * Checks PME is absent in case of fixed baseline. It's possible to perform switch since primaries can't change.
      */
     @Test
     public void testBaselineNodeLeftOnFullyRebalancedCluster() throws Exception {
+        testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.NONE);
+    }
+
+    /**
+     * Checks PME is absent/present with all nodes except first one supports PME-free switch.
+     */
+    @Test
+    public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledFirstNode() throws Exception {
+        testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.FIRST);
+    }
+
+    /**
+     * Checks PME is absent/present with all nodes except midlle one supports PME-free switch.
+     */
+    @Test
+    public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledMiddleNode() throws Exception {
+        testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.MIDDLE);
+    }
+
+    /**
+     * Checks PME is absent/present with all nodes except last one supports PME-free switch.
+     */
+    @Test
+    public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledLastNode() throws Exception {
+        testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.LAST);
+    }
+
+    /**
+     * Checks PME is absent/present in case of persistence enabled.
+     */
+    private void testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode order) throws Exception {
         persistence = true;
 
         try {
-            testNodeLeftOnFullyRebalancedCluster();
+            testNodeLeftOnFullyRebalancedCluster(order);
         }
         finally {
             persistence = false;
@@ -160,36 +192,75 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Starts node with PME-free feature explicitly disabled.
+     */
+    private void startNodeWithPmeFreeSwitchDisabled() throws Exception {
+        try {
+            System.setProperty(IGNITE_PME_FREE_SWITCH_DISABLED, "true");
+
+            Ignite ignite = startGrid(G.allGrids().size());
+
+            assertFalse(nodeSupports(ignite.cluster().localNode(), PME_FREE_SWITCH));
+        }
+        finally {
+            System.clearProperty(IGNITE_PME_FREE_SWITCH_DISABLED);
+        }
+    }
+
+    /**
      * Checks node left PME absent/present on fully rebalanced topology (Latest PME == LAA).
      */
-    private void testNodeLeftOnFullyRebalancedCluster() throws Exception {
+    private void testNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode disabled) throws Exception {
         int nodes = 10;
 
-        Ignite ignite = startGridsMultiThreaded(nodes, true);
+        switch (disabled) {
+            case FIRST:
+                startNodeWithPmeFreeSwitchDisabled();
 
-        ignite.cluster().active(true);
+                startGridsMultiThreaded(1, nodes - 1);
 
-        AtomicLong cnt = new AtomicLong();
+                break;
 
-        for (int i = 0; i < nodes; i++) {
-            TestRecordingCommunicationSpi spi =
-                (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+            case MIDDLE:
+                startGridsMultiThreaded(0, (nodes / 2) - 1);
 
-            spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
-                @Override public boolean apply(ClusterNode node, Message msg) {
-                    if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) &&
-                        ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
-                        cnt.incrementAndGet();
+                startNodeWithPmeFreeSwitchDisabled();
 
-                    if (!persistence)
-                        return false;
+                int started = G.allGrids().size();
 
-                    return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
-                        msg.getClass().equals(GridDhtPartitionsFullMessage.class);
-                }
-            });
+                startGridsMultiThreaded(started, nodes - started);
+
+                break;
+
+            case LAST:
+                startGridsMultiThreaded(0, nodes - 1);
+
+                startNodeWithPmeFreeSwitchDisabled();
+
+                break;
+
+            case NONE:
+                startGridsMultiThreaded(0, nodes);
+
+                break;
+
+            default:
+                throw new UnsupportedOperationException();
         }
 
+        assertEquals(nodes, G.allGrids().size());
+
+        assertEquals(ClusterState.INACTIVE, grid(0).cluster().state());
+
+        grid(0).cluster().state(ClusterState.ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        AtomicLong singleCnt = new AtomicLong();
+        AtomicLong fullCnt = new AtomicLong();
+
+        startPmeMessagesCounting(nodes, singleCnt, fullCnt);
+
         Random r = new Random();
 
         while (nodes > 1) {
@@ -197,13 +268,41 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
 
             awaitPartitionMapExchange(true, true, null, true);
 
-            assertEquals(persistence ? 0 /*PME absent*/ : (nodes - 1) /*regular PME*/, cnt.get());
-
             IgniteEx alive = (IgniteEx)G.allGrids().get(0);
 
             assertTrue(alive.context().cache().context().exchange().lastFinishedFuture().rebalanced());
 
-            cnt.set(0);
+            boolean pmeFreeSwitch = persistence && allNodesSupports(alive.cluster().nodes(), PME_FREE_SWITCH);
+
+            assertEquals(pmeFreeSwitch ? 0 : (nodes - 1), singleCnt.get());
+            assertEquals(pmeFreeSwitch ? 0 : (nodes - 1), fullCnt.get());
+
+            singleCnt.set(0);
+            fullCnt.set(0);
+        }
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param signleCnt Counter for GridDhtPartitionsSingleMessage.
+     * @param fullCnt Counter for GridDhtPartitionsFullMessage.
+     */
+    private void startPmeMessagesCounting(int nodes, AtomicLong signleCnt, AtomicLong fullCnt) {
+        for (int i = 0; i < nodes; i++) {
+            TestRecordingCommunicationSpi spi =
+                (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+            spi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+                @Override public void apply(ClusterNode node, Message msg) {
+                    if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) &&
+                        ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
+                        signleCnt.incrementAndGet();
+
+                    if (msg.getClass().equals(GridDhtPartitionsFullMessage.class) &&
+                        ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
+                        fullCnt.incrementAndGet();
+                }
+            });
         }
     }
 
@@ -258,25 +357,12 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
 
             int nodes = 4;
 
-            startGridsMultiThreaded(nodes, true).cluster().active(true);
-
-            AtomicLong cnt = new AtomicLong();
+            startGridsMultiThreaded(nodes);
 
-            for (int i = 0; i < nodes; i++) {
-                TestRecordingCommunicationSpi spi =
-                    (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+            AtomicLong singleCnt = new AtomicLong();
+            AtomicLong fullCnt = new AtomicLong();
 
-                spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
-                    @Override public boolean apply(ClusterNode node, Message msg) {
-                        if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) &&
-                            ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
-                            cnt.incrementAndGet();
-
-                        return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
-                            msg.getClass().equals(GridDhtPartitionsFullMessage.class);
-                    }
-                });
-            }
+            startPmeMessagesCounting(nodes, singleCnt, fullCnt);
 
             Random r = new Random();
 
@@ -457,7 +543,8 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
 
             assertEquals(nodes - 1, pmeFreeCnt);
 
-            assertEquals(0, cnt.get());
+            assertEquals(0, singleCnt.get());
+            assertEquals(0, fullCnt.get());
         }
         finally {
             persistence = false;
@@ -599,4 +686,21 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
             return res;
         }
     }
+
+    /**
+     * Specifies node to start with IGNITE_PME_FREE_SWITCH_DISABLED JVM option.
+     */
+    private enum PmeFreeSwitchDisabledNode {
+        /** First. */
+        FIRST,
+
+        /** Middle. */
+        MIDDLE,
+
+        /** Last. */
+        LAST,
+
+        /** None. */
+        NONE
+    }
 }