You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/01/27 07:08:20 UTC
[ignite] branch master updated: IGNITE-12470 Pme-free switch
feature should be deactivatable (#7304)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0e6b97e IGNITE-12470 Pme-free switch feature should be deactivatable (#7304)
0e6b97e is described below
commit 0e6b97edc1d6a1faba5efa10d69e6eb69c78ae24
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon Jan 27 10:07:57 2020 +0300
IGNITE-12470 Pme-free switch feature should be deactivatable (#7304)
Signed-off-by: Anton Vinogradov <av...@apache.org>
---
.../org/apache/ignite/IgniteSystemProperties.java | 3 +
.../org/apache/ignite/internal/IgniteFeatures.java | 5 +
.../internal/processors/cache/ExchangeContext.java | 17 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 2 +-
.../distributed/GridExchangeFreeSwitchTest.java | 208 +++++++++++++++------
5 files changed, 180 insertions(+), 55 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index b474eb9..22b998e 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -207,6 +207,9 @@ public final class IgniteSystemProperties {
/** */
public static final String IGNITE_EXCHANGE_MERGE_DELAY = "IGNITE_EXCHANGE_MERGE_DELAY";
+ /** PME-free switch explicitly disabled. */
+ public static final String IGNITE_PME_FREE_SWITCH_DISABLED = "IGNITE_PME_FREE_SWITCH_DISABLED";
+
/**
* Name of the system property defining name of command line program.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 956d72f..fa6bd0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PME_FREE_SWITCH_DISABLED;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
/**
@@ -166,6 +168,9 @@ public enum IgniteFeatures {
final BitSet set = new BitSet();
for (IgniteFeatures value : IgniteFeatures.values()) {
+ if (value == PME_FREE_SWITCH && getBoolean(IGNITE_PME_FREE_SWITCH_DISABLED))
+ continue;
+
final int featureId = value.getFeatureId();
assert !set.get(featureId) : "Duplicate feature ID found for [" + value + "] having same ID ["
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index e101f1a..f9e9376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.HashSet;
import java.util.Set;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -38,6 +39,9 @@ public class ExchangeContext {
/** */
public static final String IGNITE_EXCHANGE_COMPATIBILITY_VER_1 = "IGNITE_EXCHANGE_COMPATIBILITY_VER_1";
+ /** Logger. */
+ private final IgniteLogger log;
+
/** Cache groups to request affinity for during local join exchange. */
private Set<Integer> requestGrpsAffOnJoin;
@@ -57,16 +61,25 @@ public class ExchangeContext {
private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
/**
+ * @param cctx Context.
* @param crd Coordinator flag.
* @param fut Exchange future.
*/
- public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
+ public ExchangeContext(GridCacheSharedContext<?, ?> cctx, boolean crd, GridDhtPartitionsExchangeFuture fut) {
+ log = cctx.logger(getClass());
+
int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
+ boolean allNodesSupportsPmeFreeSwitch = allNodesSupports(fut.firstEventCache().allNodes(), PME_FREE_SWITCH);
+
+ if (!allNodesSupportsPmeFreeSwitch)
+ log.warning("Current topology does not support the PME-free switch. Please check all nodes support" +
+ " this feature and it was not explicitly disabled by IGNITE_PME_FREE_SWITCH_DISABLED JVM option.");
+
if (!compatibilityNode &&
fut.wasRebalanced() &&
fut.isBaselineNodeFailed() &&
- allNodesSupports(fut.firstEventCache().allNodes(), PME_FREE_SWITCH)) {
+ allNodesSupportsPmeFreeSwitch) {
exchangeFreeSwitch = true;
merge = false;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 890e28b..40055f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -771,7 +771,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
boolean crdNode = crd != null && crd.isLocal();
- exchCtx = new ExchangeContext(crdNode, this);
+ exchCtx = new ExchangeContext(cctx, crdNode, this);
cctx.exchange().exchangerBlockingSectionBegin();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
index 7ca5fff..cbc2a49 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -47,7 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -55,7 +56,11 @@ import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PME_FREE_SWITCH_DISABLED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteFeatures.PME_FREE_SWITCH;
+import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
/**
*
@@ -81,16 +86,13 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
cfg.setCacheConfiguration(cacheC != null ?
cacheC.apply(igniteInstanceName) : new CacheConfiguration[] {cacheConfiguration()});
+ cfg.setClusterStateOnStart(ClusterState.INACTIVE);
+
DataStorageConfiguration dsCfg = new DataStorageConfiguration();
DataRegionConfiguration drCfg = new DataRegionConfiguration();
- if (persistence) {
- drCfg.setPersistenceEnabled(true);
-
- cfg.setActiveOnStart(false);
- cfg.setAutoActivationEnabled(false);
- }
+ drCfg.setPersistenceEnabled(persistence);
dsCfg.setDefaultDataRegionConfiguration(drCfg);
@@ -135,24 +137,54 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
}
/**
- * Checks Partition Exchange happen in case of baseline auto-adjust (in-memory cluster). It's not possible to
- * perform switch since primaries may change.
+ * Checks PME happen in case of baseline auto-adjust (in-memory cluster). It's not possible to perform switch since
+ * primaries may change.
*/
@Test
public void testNonBaselineNodeLeftOnFullyRebalancedCluster() throws Exception {
- testNodeLeftOnFullyRebalancedCluster();
+ testNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.NONE);
}
/**
- * Checks Partition Exchange is absent in case of fixed baseline. It's possible to perform switch since primaries
- * can't change.
+ * Checks PME is absent in case of fixed baseline. It's possible to perform switch since primaries can't change.
*/
@Test
public void testBaselineNodeLeftOnFullyRebalancedCluster() throws Exception {
+ testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.NONE);
+ }
+
+ /**
+ * Checks PME is absent/present with all nodes except first one supports PME-free switch.
+ */
+ @Test
+ public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledFirstNode() throws Exception {
+ testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.FIRST);
+ }
+
+ /**
+ * Checks PME is absent/present with all nodes except midlle one supports PME-free switch.
+ */
+ @Test
+ public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledMiddleNode() throws Exception {
+ testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.MIDDLE);
+ }
+
+ /**
+ * Checks PME is absent/present with all nodes except last one supports PME-free switch.
+ */
+ @Test
+ public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledLastNode() throws Exception {
+ testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.LAST);
+ }
+
+ /**
+ * Checks PME is absent/present in case of persistence enabled.
+ */
+ private void testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode order) throws Exception {
persistence = true;
try {
- testNodeLeftOnFullyRebalancedCluster();
+ testNodeLeftOnFullyRebalancedCluster(order);
}
finally {
persistence = false;
@@ -160,36 +192,75 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
}
/**
+ * Starts node with PME-free feature explicitly disabled.
+ */
+ private void startNodeWithPmeFreeSwitchDisabled() throws Exception {
+ try {
+ System.setProperty(IGNITE_PME_FREE_SWITCH_DISABLED, "true");
+
+ Ignite ignite = startGrid(G.allGrids().size());
+
+ assertFalse(nodeSupports(ignite.cluster().localNode(), PME_FREE_SWITCH));
+ }
+ finally {
+ System.clearProperty(IGNITE_PME_FREE_SWITCH_DISABLED);
+ }
+ }
+
+ /**
* Checks node left PME absent/present on fully rebalanced topology (Latest PME == LAA).
*/
- private void testNodeLeftOnFullyRebalancedCluster() throws Exception {
+ private void testNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode disabled) throws Exception {
int nodes = 10;
- Ignite ignite = startGridsMultiThreaded(nodes, true);
+ switch (disabled) {
+ case FIRST:
+ startNodeWithPmeFreeSwitchDisabled();
- ignite.cluster().active(true);
+ startGridsMultiThreaded(1, nodes - 1);
- AtomicLong cnt = new AtomicLong();
+ break;
- for (int i = 0; i < nodes; i++) {
- TestRecordingCommunicationSpi spi =
- (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+ case MIDDLE:
+ startGridsMultiThreaded(0, (nodes / 2) - 1);
- spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
- @Override public boolean apply(ClusterNode node, Message msg) {
- if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) &&
- ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
- cnt.incrementAndGet();
+ startNodeWithPmeFreeSwitchDisabled();
- if (!persistence)
- return false;
+ int started = G.allGrids().size();
- return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
- msg.getClass().equals(GridDhtPartitionsFullMessage.class);
- }
- });
+ startGridsMultiThreaded(started, nodes - started);
+
+ break;
+
+ case LAST:
+ startGridsMultiThreaded(0, nodes - 1);
+
+ startNodeWithPmeFreeSwitchDisabled();
+
+ break;
+
+ case NONE:
+ startGridsMultiThreaded(0, nodes);
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
}
+ assertEquals(nodes, G.allGrids().size());
+
+ assertEquals(ClusterState.INACTIVE, grid(0).cluster().state());
+
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ AtomicLong singleCnt = new AtomicLong();
+ AtomicLong fullCnt = new AtomicLong();
+
+ startPmeMessagesCounting(nodes, singleCnt, fullCnt);
+
Random r = new Random();
while (nodes > 1) {
@@ -197,13 +268,41 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
awaitPartitionMapExchange(true, true, null, true);
- assertEquals(persistence ? 0 /*PME absent*/ : (nodes - 1) /*regular PME*/, cnt.get());
-
IgniteEx alive = (IgniteEx)G.allGrids().get(0);
assertTrue(alive.context().cache().context().exchange().lastFinishedFuture().rebalanced());
- cnt.set(0);
+ boolean pmeFreeSwitch = persistence && allNodesSupports(alive.cluster().nodes(), PME_FREE_SWITCH);
+
+ assertEquals(pmeFreeSwitch ? 0 : (nodes - 1), singleCnt.get());
+ assertEquals(pmeFreeSwitch ? 0 : (nodes - 1), fullCnt.get());
+
+ singleCnt.set(0);
+ fullCnt.set(0);
+ }
+ }
+
+ /**
+ * @param nodes Nodes.
+ * @param signleCnt Counter for GridDhtPartitionsSingleMessage.
+ * @param fullCnt Counter for GridDhtPartitionsFullMessage.
+ */
+ private void startPmeMessagesCounting(int nodes, AtomicLong signleCnt, AtomicLong fullCnt) {
+ for (int i = 0; i < nodes; i++) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+ spi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+ @Override public void apply(ClusterNode node, Message msg) {
+ if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) &&
+ ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
+ signleCnt.incrementAndGet();
+
+ if (msg.getClass().equals(GridDhtPartitionsFullMessage.class) &&
+ ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
+ fullCnt.incrementAndGet();
+ }
+ });
}
}
@@ -258,25 +357,12 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
int nodes = 4;
- startGridsMultiThreaded(nodes, true).cluster().active(true);
-
- AtomicLong cnt = new AtomicLong();
+ startGridsMultiThreaded(nodes);
- for (int i = 0; i < nodes; i++) {
- TestRecordingCommunicationSpi spi =
- (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+ AtomicLong singleCnt = new AtomicLong();
+ AtomicLong fullCnt = new AtomicLong();
- spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
- @Override public boolean apply(ClusterNode node, Message msg) {
- if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) &&
- ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
- cnt.incrementAndGet();
-
- return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
- msg.getClass().equals(GridDhtPartitionsFullMessage.class);
- }
- });
- }
+ startPmeMessagesCounting(nodes, singleCnt, fullCnt);
Random r = new Random();
@@ -457,7 +543,8 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
assertEquals(nodes - 1, pmeFreeCnt);
- assertEquals(0, cnt.get());
+ assertEquals(0, singleCnt.get());
+ assertEquals(0, fullCnt.get());
}
finally {
persistence = false;
@@ -599,4 +686,21 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
return res;
}
}
+
+ /**
+ * Specifies node to start with IGNITE_PME_FREE_SWITCH_DISABLED JVM option.
+ */
+ private enum PmeFreeSwitchDisabledNode {
+ /** First. */
+ FIRST,
+
+ /** Middle. */
+ MIDDLE,
+
+ /** Last. */
+ LAST,
+
+ /** None. */
+ NONE
+ }
}