You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/01 09:05:28 UTC
[ignite] branch master updated: IGNITE-14394 Fixed baseline auto
adjustment triggered by merged exchanges. Fixes #8934
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 225f679 IGNITE-14394 Fixed baseline auto adjustment triggered by merged exchanges. Fixes #8934
225f679 is described below
commit 225f679bd0474e941fa8d90f050aad713e77f0b1
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Thu Apr 1 12:04:41 2021 +0300
IGNITE-14394 Fixed baseline auto adjustment triggered by merged exchanges. Fixes #8934
---
.../autoadjust/BaselineAutoAdjustData.java | 11 +-
.../autoadjust/BaselineAutoAdjustScheduler.java | 40 ++++-
.../autoadjust/BaselineTopologyUpdater.java | 8 +-
.../processors/cluster/BaselineAutoAdjustTest.java | 184 ++++++++++++++++++---
4 files changed, 209 insertions(+), 34 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java
index 07d01e3..79c608d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cluster.baseline.autoadjust;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
/**
* Container of required data for changing baseline.
*/
@@ -28,10 +30,10 @@ class BaselineAutoAdjustData {
private final long targetTopologyVersion;
/** {@code true} If this data don't actual anymore and it setting should be skipped. */
- private volatile boolean invalidated = false;
+ private volatile boolean invalidated;
/** {@code true} If this data was adjusted. */
- private volatile boolean adjusted = false;
+ private volatile boolean adjusted;
/**
* @param targetTopologyVersion Topology version nodes of which should be set by this task.
@@ -97,4 +99,9 @@ class BaselineAutoAdjustData {
return new BaselineAutoAdjustData(targetTopologyVersion);
}
+
+ /** {@inheritDoc */
+ @Override public String toString() {
+ return S.toString(BaselineAutoAdjustData.class, this);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java
index 0e27195..a466d1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java
@@ -57,16 +57,39 @@ class BaselineAutoAdjustScheduler {
}
/**
- * Adding task to queue with delay and remove previous one.
+ * Adds a new task to queue based on the given {@code baselineAutoAdjustData} with delay and remove previous one.
+ * A new task can be rejected in case of the given {@code baselineAutoAdjustData} is expired or
+ * the target topology version is less than the already scheduled version.
*
* @param baselineAutoAdjustData Data for changing baseline.
* @param delay Delay after which set baseline should be started.
+ * @return {@code true} If a new task was successfully scheduled.
*/
- public synchronized void schedule(BaselineAutoAdjustData baselineAutoAdjustData, long delay) {
- if (baselineTimeoutObj != null)
+ public synchronized boolean schedule(BaselineAutoAdjustData baselineAutoAdjustData, long delay) {
+ if (baselineAutoAdjustExecutor.isExecutionExpired(baselineAutoAdjustData)) {
+ if (log.isDebugEnabled())
+ log.debug("Baseline auto adjust data is expired (will not be scheduled) [data=" + baselineAutoAdjustData + ']');
+
+ return false;
+ }
+
+ if (baselineTimeoutObj != null) {
+ long targetVer = baselineAutoAdjustData.getTargetTopologyVersion();
+ long alreadyScheduledVer = baselineTimeoutObj.baselineAutoAdjustData.getTargetTopologyVersion();
+
+ if (alreadyScheduledVer > targetVer) {
+ if (log.isDebugEnabled()) {
+ log.debug("Baseline auto adjust data is targeted to obsolete version (will not be scheduled) " +
+ "[data=" + baselineAutoAdjustData + ", scheduled=" + baselineTimeoutObj.baselineAutoAdjustData + ']');
+ }
+
+ return false;
+ }
+
timeoutProcessor.removeTimeoutObject(baselineTimeoutObj);
+ }
- timeoutProcessor.addTimeoutObject(
+ boolean added = timeoutProcessor.addTimeoutObject(
baselineTimeoutObj = new BaselineMultiplyUseTimeoutObject(
baselineAutoAdjustData,
delay, baselineAutoAdjustExecutor,
@@ -74,12 +97,19 @@ class BaselineAutoAdjustScheduler {
log
)
);
+
+ if (log.isDebugEnabled()) {
+ log.info("New baseline timeout object was " + (added ? "successfully scheduled " : " rejected ") +
+ " [data=" + baselineTimeoutObj.baselineAutoAdjustData + ']');
+ }
+
+ return added;
}
/**
* @return Time of last scheduled task or -1 if it doesn't exist.
*/
- public long lastScheduledTaskTime() {
+ public synchronized long lastScheduledTaskTime() {
if (baselineTimeoutObj == null)
return -1;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java
index 3501bec..b2ffe26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java
@@ -124,11 +124,11 @@ public class BaselineTopologyUpdater {
long timeout = baselineConfiguration.getBaselineAutoAdjustTimeout();
- log.warning("Baseline auto-adjust will be executed in '" + timeout + "' ms");
-
- baselineAutoAdjustScheduler.schedule(baselineData, timeout);
+ // In case of merging exchanges the baseline data can be already expired
+ // and so it should be rejected by scheduler.
+ if (baselineAutoAdjustScheduler.schedule(baselineData, timeout))
+ log.warning("Baseline auto-adjust will be executed in '" + timeout + "' ms");
});
-
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
index dff853d..dcaf9a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
@@ -17,22 +17,35 @@
package org.apache.ignite.internal.processors.cluster;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cluster.BaselineNode;
-import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.lifecycle.LifecycleEventType;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -41,6 +54,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -60,6 +77,9 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
/** */
private static int autoAdjustTimeout = 5000;
+ /** Lifecycle bean. */
+ private LifecycleBean lifecycleBean;
+
/**
* @throws Exception if failed.
*/
@@ -100,6 +120,10 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
cfg.setDataStorageConfiguration(storageCfg);
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setLifecycleBeans(lifecycleBean);
+
return cfg;
}
@@ -109,6 +133,119 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
}
/**
+ * Tests that merging exchanges properly triggers baseline changing.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExchangeMerge() throws Exception {
+ // Latch that waits for PME (intTopVer == 3.0)
+ CountDownLatch exchangeWorkerLatch = new CountDownLatch(1);
+
+ // Lyficycle bean is needed in order to register EVT_NODE_JOIN lister that is called
+ // right after GridCachePartitionExchangeManager and before GridClusterStateProcessor.
+ lifecycleBean = new LifecycleBean() {
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ /** {@inheritDoc} */
+ @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteException {
+ if (evt == LifecycleEventType.BEFORE_NODE_START) {
+ ignite.context().internalSubscriptionProcessor()
+ .registerDistributedMetastorageListener(new DistributedMetastorageLifecycleListener() {
+ @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+ ignite.context().event().addDiscoveryEventListener((evt, disco) -> {
+ if (evt.type() == EVT_NODE_JOINED && evt.topologyVersion() == 3) {
+ try {
+ // Let's wait for exchange worker starts PME
+ // that related to the first node joined the cluster.
+ exchangeWorkerLatch.await(getTestTimeout(), MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException("exchangeWorkerLatch has been interrupted.", e);
+ }
+ }
+ }, EVT_NODE_JOINED);
+ }
+ });
+ }
+ }
+ };
+
+ // Start the coordinator node.
+ IgniteEx crd = startGrid(0);
+
+ // This bean is only required on the coordinator node.
+ lifecycleBean = null;
+
+ // Latch indicates that EVT_NODE_JOINED (topVer == 4.0) was processed by all listeners.
+ CountDownLatch nodeJoinLatch = new CountDownLatch(1);
+
+ // This listener is the last one in the queue of handlers.
+ crd.context().event().addDiscoveryEventListener((evt, disco) -> {
+ if (evt.type() == EVT_NODE_JOINED && evt.topologyVersion() == 4)
+ nodeJoinLatch.countDown();
+ }, EVT_NODE_JOINED);
+
+ IgniteEx nonCrd = startGrid(1);
+
+ crd.cluster().state(ACTIVE);
+
+ crd.cluster().baselineAutoAdjustEnabled(true);
+ crd.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
+
+ awaitPartitionMapExchange(false, true, null);
+
+ TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(nonCrd);
+ spi1.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+
+ // Target major topology version (4 nodes)
+ long targetTopVer = 4;
+
+ // Let's block exchange process in order to merge two following exchanges (3.0 && 4.0).
+ crd.context()
+ .cache()
+ .context()
+ .exchange()
+ .mergeExchangesTestWaitVersion(new AffinityTopologyVersion(targetTopVer, 0), null);
+
+ AtomicInteger cnt = new AtomicInteger(G.allGrids().size());
+ runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(cnt.getAndIncrement());
+
+ return null;
+ }
+ }, 2, "async-grid-starter");
+
+ // Make sure that PME is in progress.
+ assertTrue("Failed to wait for PME [topVer=3]", spi1.waitForBlocked(1, getTestTimeout()));
+
+ assertTrue(
+ "Failed to wait for the first started exchange.",
+ waitForCondition(() -> {
+ GridDhtPartitionsExchangeFuture fut = crd.context().cache().context().exchange().lastTopologyFuture();
+ return fut.initialVersion().topologyVersion() == 3;
+ }, getTestTimeout()));
+
+ // This guarantees that BaselineAutoAdjustData listens to real GridDhtPartitionsExchangeFuture
+ // instead of readyAffinityFuture.
+ exchangeWorkerLatch.countDown();
+
+ assertTrue(
+ "Failed to wait for processing node join event [topVer=3]",
+ nodeJoinLatch.await(getTestTimeout(), MILLISECONDS));
+
+ // Unblock PME
+ spi1.stopBlock();
+
+ assertTrue(
+ "Failed to wait for changing baseline in " + autoAdjustTimeout * 2 + " ms.",
+ waitForCondition(() -> crd.cluster().currentBaselineTopology().size() == targetTopVer, autoAdjustTimeout * 2));
+ }
+
+ /**
* @throws Exception if failed.
*/
@Test
@@ -117,7 +254,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(true);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
@@ -148,7 +285,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(true);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
@@ -185,7 +322,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(true);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
@@ -224,7 +361,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(true);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
@@ -245,9 +382,10 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
* @return {@code true} if current baseline consist from one node.
*/
private boolean isCurrentBaselineFromOneNode(Ignite ignite0) {
- return ignite0.cluster().currentBaselineTopology().stream()
- .map(BaselineNode::consistentId)
- .allMatch(((IgniteEx)ignite0).localNode().consistentId()::equals);
+ return ignite0.cluster().currentBaselineTopology() != null &&
+ ignite0.cluster().currentBaselineTopology().stream()
+ .map(BaselineNode::consistentId)
+ .allMatch(((IgniteEx)ignite0).localNode().consistentId()::equals);
}
/**
@@ -259,7 +397,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
Ignite ignite0 = startGrids(2);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
Set<Object> initBaseline = ignite0.cluster().currentBaselineTopology().stream()
.map(BaselineNode::consistentId)
@@ -297,11 +435,11 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
*/
@Test(expected = BaselineAdjustForbiddenException.class)
public void testBaselineAutoAdjustThrowExceptionWhenBaselineChangedManually() throws Exception {
- Ignite ignite0 = startGrids(2);
+ IgniteEx ignite0 = startGrids(2);
ignite0.cluster().baselineAutoAdjustEnabled(true);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
@@ -311,7 +449,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
stopGrid(1);
- ignite0.cluster().setBaselineTopology(Arrays.asList(((IgniteEx)ignite0).localNode()));
+ ignite0.cluster().setBaselineTopology(Collections.singletonList(ignite0.localNode()));
}
/**
@@ -327,7 +465,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(true);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
@@ -341,7 +479,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0 = startGrids(2);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
Set<Object> baselineNodesAfterRestart = ignite0.cluster().currentBaselineTopology().stream()
.map(BaselineNode::consistentId)
@@ -370,7 +508,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
startGrid(1);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
@@ -402,7 +540,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(false);
//This activation guarantee that baseline would be set.
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(0);
@@ -442,7 +580,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustEnabled(false);
@@ -456,7 +594,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
public void shouldNodeWithPersistenceSuccessfullyJoinedToClusterWhenTimeoutGreaterThanZero() throws Exception {
IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustTimeout(1);
@@ -472,7 +610,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(true);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
startGrid(inMemoryConfiguration(1));
@@ -488,7 +626,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
public void shouldJoinSuccessBecauseClusterHasPersistentNode() throws Exception {
IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
ignite0.cluster().baselineAutoAdjustEnabled(false);
@@ -507,7 +645,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
startGrid(inMemoryConfiguration(1));
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ACTIVE);
try {
startGrid(persistentRegionConfiguration(2));
@@ -536,7 +674,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().baselineAutoAdjustEnabled(false);
- ignite0.cluster().state(ClusterState.ACTIVE);
+ ignite0.cluster().state(ACTIVE);
startGrid(1);