You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/01 09:05:28 UTC

[ignite] branch master updated: IGNITE-14394 Fixed baseline auto adjustment triggered by merged exchanges. Fixes #8934

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 225f679  IGNITE-14394 Fixed baseline auto adjustment triggered by merged exchanges. Fixes #8934
225f679 is described below

commit 225f679bd0474e941fa8d90f050aad713e77f0b1
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Thu Apr 1 12:04:41 2021 +0300

    IGNITE-14394 Fixed baseline auto adjustment triggered by merged exchanges. Fixes #8934
---
 .../autoadjust/BaselineAutoAdjustData.java         |  11 +-
 .../autoadjust/BaselineAutoAdjustScheduler.java    |  40 ++++-
 .../autoadjust/BaselineTopologyUpdater.java        |   8 +-
 .../processors/cluster/BaselineAutoAdjustTest.java | 184 ++++++++++++++++++---
 4 files changed, 209 insertions(+), 34 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java
index 07d01e3..79c608d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustData.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cluster.baseline.autoadjust;
 
+import org.apache.ignite.internal.util.typedef.internal.S;
+
 /**
  * Container of required data for changing baseline.
  */
@@ -28,10 +30,10 @@ class BaselineAutoAdjustData {
     private final long targetTopologyVersion;
 
     /** {@code true} If this data don't actual anymore and it setting should be skipped. */
-    private volatile boolean invalidated = false;
+    private volatile boolean invalidated;
 
     /** {@code true} If this data was adjusted. */
-    private volatile boolean adjusted = false;
+    private volatile boolean adjusted;
 
     /**
      * @param targetTopologyVersion Topology version nodes of which should be set by this task.
@@ -97,4 +99,9 @@ class BaselineAutoAdjustData {
 
         return new BaselineAutoAdjustData(targetTopologyVersion);
     }
+
+    /** {@inheritDoc */
+    @Override public String toString() {
+        return S.toString(BaselineAutoAdjustData.class, this);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java
index 0e27195..a466d1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java
@@ -57,16 +57,39 @@ class BaselineAutoAdjustScheduler {
     }
 
     /**
-     * Adding task to queue with delay and remove previous one.
+     * Adds a new task to queue based on the given {@code baselineAutoAdjustData} with delay and remove previous one.
+     * A new task can be rejected in case of the given {@code baselineAutoAdjustData} is expired or
+     * the target topology version is less than the already scheduled version.
      *
      * @param baselineAutoAdjustData Data for changing baseline.
      * @param delay Delay after which set baseline should be started.
+     * @return {@code true} If a new task was successfully scheduled.
      */
-    public synchronized void schedule(BaselineAutoAdjustData baselineAutoAdjustData, long delay) {
-        if (baselineTimeoutObj != null)
+    public synchronized boolean schedule(BaselineAutoAdjustData baselineAutoAdjustData, long delay) {
+        if (baselineAutoAdjustExecutor.isExecutionExpired(baselineAutoAdjustData)) {
+            if (log.isDebugEnabled())
+                log.debug("Baseline auto adjust data is expired (will not be scheduled) [data=" + baselineAutoAdjustData + ']');
+
+            return false;
+        }
+
+        if (baselineTimeoutObj != null) {
+            long targetVer = baselineAutoAdjustData.getTargetTopologyVersion();
+            long alreadyScheduledVer = baselineTimeoutObj.baselineAutoAdjustData.getTargetTopologyVersion();
+
+            if (alreadyScheduledVer > targetVer) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Baseline auto adjust data is targeted to obsolete version (will not be scheduled) " +
+                        "[data=" + baselineAutoAdjustData + ", scheduled=" + baselineTimeoutObj.baselineAutoAdjustData + ']');
+                }
+
+                return false;
+            }
+
             timeoutProcessor.removeTimeoutObject(baselineTimeoutObj);
+        }
 
-        timeoutProcessor.addTimeoutObject(
+        boolean added = timeoutProcessor.addTimeoutObject(
             baselineTimeoutObj = new BaselineMultiplyUseTimeoutObject(
                 baselineAutoAdjustData,
                 delay, baselineAutoAdjustExecutor,
@@ -74,12 +97,19 @@ class BaselineAutoAdjustScheduler {
                 log
             )
         );
+
+        if (log.isDebugEnabled()) {
+            log.info("New baseline timeout object was " + (added ? "successfully scheduled " : " rejected ") +
+                " [data=" + baselineTimeoutObj.baselineAutoAdjustData + ']');
+        }
+
+        return added;
     }
 
     /**
      * @return Time of last scheduled task or -1 if it doesn't exist.
      */
-    public long lastScheduledTaskTime() {
+    public synchronized long lastScheduledTaskTime() {
         if (baselineTimeoutObj == null)
             return -1;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java
index 3501bec..b2ffe26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java
@@ -124,11 +124,11 @@ public class BaselineTopologyUpdater {
 
                         long timeout = baselineConfiguration.getBaselineAutoAdjustTimeout();
 
-                        log.warning("Baseline auto-adjust will be executed in '" + timeout + "' ms");
-
-                        baselineAutoAdjustScheduler.schedule(baselineData, timeout);
+                        // In case of merging exchanges the baseline data can be already expired
+                        // and so it should be rejected by scheduler.
+                        if (baselineAutoAdjustScheduler.schedule(baselineData, timeout))
+                            log.warning("Baseline auto-adjust will be executed in '" + timeout + "' ms");
                     });
-
             }
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
index dff853d..dcaf9a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
@@ -17,22 +17,35 @@
 
 package org.apache.ignite.internal.processors.cluster;
 
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.BaselineNode;
-import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.lifecycle.LifecycleEventType;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -41,6 +54,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -60,6 +77,9 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
     /** */
     private static int autoAdjustTimeout = 5000;
 
+    /** Lifecycle bean. */
+    private LifecycleBean lifecycleBean;
+
     /**
      * @throws Exception if failed.
      */
@@ -100,6 +120,10 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         cfg.setDataStorageConfiguration(storageCfg);
 
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setLifecycleBeans(lifecycleBean);
+
         return cfg;
     }
 
@@ -109,6 +133,119 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Tests that merging exchanges properly triggers baseline changing.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExchangeMerge() throws Exception {
+        // Latch that waits for PME (intTopVer == 3.0)
+        CountDownLatch exchangeWorkerLatch = new CountDownLatch(1);
+
+        // Lyficycle bean is needed in order to register EVT_NODE_JOIN lister that is called
+        // right after GridCachePartitionExchangeManager and before GridClusterStateProcessor.
+        lifecycleBean = new LifecycleBean() {
+            /** Ignite instance. */
+            @IgniteInstanceResource
+            IgniteEx ignite;
+
+            /** {@inheritDoc} */
+            @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteException {
+                if (evt == LifecycleEventType.BEFORE_NODE_START) {
+                    ignite.context().internalSubscriptionProcessor()
+                        .registerDistributedMetastorageListener(new DistributedMetastorageLifecycleListener() {
+                            @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+                                ignite.context().event().addDiscoveryEventListener((evt, disco) -> {
+                                    if (evt.type() == EVT_NODE_JOINED && evt.topologyVersion() == 3) {
+                                        try {
+                                            // Let's wait for exchange worker starts PME
+                                            // that related to the first node joined the cluster.
+                                            exchangeWorkerLatch.await(getTestTimeout(), MILLISECONDS);
+                                        }
+                                        catch (InterruptedException e) {
+                                            throw new IgniteException("exchangeWorkerLatch has been interrupted.", e);
+                                        }
+                                    }
+                                }, EVT_NODE_JOINED);
+                            }
+                        });
+                }
+            }
+        };
+
+        // Start the coordinator node.
+        IgniteEx crd = startGrid(0);
+
+        // This bean is only required on the coordinator node.
+        lifecycleBean = null;
+
+        // Latch indicates that EVT_NODE_JOINED (topVer == 4.0) was processed by all listeners.
+        CountDownLatch nodeJoinLatch = new CountDownLatch(1);
+
+        // This listener is the last one in the queue of handlers.
+        crd.context().event().addDiscoveryEventListener((evt, disco) -> {
+            if (evt.type() == EVT_NODE_JOINED && evt.topologyVersion() == 4)
+                nodeJoinLatch.countDown();
+        }, EVT_NODE_JOINED);
+
+        IgniteEx nonCrd = startGrid(1);
+
+        crd.cluster().state(ACTIVE);
+
+        crd.cluster().baselineAutoAdjustEnabled(true);
+        crd.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
+
+        awaitPartitionMapExchange(false, true, null);
+
+        TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(nonCrd);
+        spi1.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+
+        // Target major topology version (4 nodes)
+        long targetTopVer = 4;
+
+        // Let's block exchange process in order to merge two following exchanges (3.0 && 4.0).
+        crd.context()
+            .cache()
+            .context()
+            .exchange()
+            .mergeExchangesTestWaitVersion(new AffinityTopologyVersion(targetTopVer, 0), null);
+
+        AtomicInteger cnt = new AtomicInteger(G.allGrids().size());
+        runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(cnt.getAndIncrement());
+
+                return null;
+            }
+        }, 2, "async-grid-starter");
+
+        // Make sure that PME is in progress.
+        assertTrue("Failed to wait for PME [topVer=3]", spi1.waitForBlocked(1, getTestTimeout()));
+
+        assertTrue(
+            "Failed to wait for the first started exchange.",
+            waitForCondition(() -> {
+                GridDhtPartitionsExchangeFuture fut = crd.context().cache().context().exchange().lastTopologyFuture();
+                return fut.initialVersion().topologyVersion() == 3;
+            }, getTestTimeout()));
+
+        // This guarantees that BaselineAutoAdjustData listens to real GridDhtPartitionsExchangeFuture
+        // instead of readyAffinityFuture.
+        exchangeWorkerLatch.countDown();
+
+        assertTrue(
+            "Failed to wait for processing node join event [topVer=3]",
+            nodeJoinLatch.await(getTestTimeout(), MILLISECONDS));
+
+        // Unblock PME
+        spi1.stopBlock();
+
+        assertTrue(
+            "Failed to wait for changing baseline in " + autoAdjustTimeout * 2 + " ms.",
+            waitForCondition(() -> crd.cluster().currentBaselineTopology().size() == targetTopVer, autoAdjustTimeout * 2));
+    }
+
+    /**
      * @throws Exception if failed.
      */
     @Test
@@ -117,7 +254,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0.cluster().baselineAutoAdjustEnabled(true);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
 
@@ -148,7 +285,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0.cluster().baselineAutoAdjustEnabled(true);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
 
@@ -185,7 +322,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0.cluster().baselineAutoAdjustEnabled(true);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
 
@@ -224,7 +361,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0.cluster().baselineAutoAdjustEnabled(true);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
 
@@ -245,9 +382,10 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
      * @return {@code true} if current baseline consist from one node.
      */
     private boolean isCurrentBaselineFromOneNode(Ignite ignite0) {
-        return ignite0.cluster().currentBaselineTopology().stream()
-            .map(BaselineNode::consistentId)
-            .allMatch(((IgniteEx)ignite0).localNode().consistentId()::equals);
+        return ignite0.cluster().currentBaselineTopology() != null &&
+            ignite0.cluster().currentBaselineTopology().stream()
+                .map(BaselineNode::consistentId)
+                .allMatch(((IgniteEx)ignite0).localNode().consistentId()::equals);
     }
 
     /**
@@ -259,7 +397,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         Ignite ignite0 = startGrids(2);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         Set<Object> initBaseline = ignite0.cluster().currentBaselineTopology().stream()
             .map(BaselineNode::consistentId)
@@ -297,11 +435,11 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
      */
     @Test(expected = BaselineAdjustForbiddenException.class)
     public void testBaselineAutoAdjustThrowExceptionWhenBaselineChangedManually() throws Exception {
-        Ignite ignite0 = startGrids(2);
+        IgniteEx ignite0 = startGrids(2);
 
         ignite0.cluster().baselineAutoAdjustEnabled(true);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
 
@@ -311,7 +449,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         stopGrid(1);
 
-        ignite0.cluster().setBaselineTopology(Arrays.asList(((IgniteEx)ignite0).localNode()));
+        ignite0.cluster().setBaselineTopology(Collections.singletonList(ignite0.localNode()));
     }
 
     /**
@@ -327,7 +465,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0.cluster().baselineAutoAdjustEnabled(true);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
 
@@ -341,7 +479,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0 = startGrids(2);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         Set<Object> baselineNodesAfterRestart = ignite0.cluster().currentBaselineTopology().stream()
             .map(BaselineNode::consistentId)
@@ -370,7 +508,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         startGrid(1);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
 
@@ -402,7 +540,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
             ignite0.cluster().baselineAutoAdjustEnabled(false);
 
             //This activation guarantee that baseline would be set.
-            ignite0.cluster().active(true);
+            ignite0.cluster().state(ACTIVE);
 
             ignite0.cluster().baselineAutoAdjustTimeout(0);
 
@@ -442,7 +580,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustEnabled(false);
 
@@ -456,7 +594,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
     public void shouldNodeWithPersistenceSuccessfullyJoinedToClusterWhenTimeoutGreaterThanZero() throws Exception {
         IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustTimeout(1);
 
@@ -472,7 +610,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0.cluster().baselineAutoAdjustEnabled(true);
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         startGrid(inMemoryConfiguration(1));
 
@@ -488,7 +626,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
     public void shouldJoinSuccessBecauseClusterHasPersistentNode() throws Exception {
         IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         ignite0.cluster().baselineAutoAdjustEnabled(false);
 
@@ -507,7 +645,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
         IgniteEx ignite0 = startGrid(inMemoryConfiguration(0));
         startGrid(inMemoryConfiguration(1));
 
-        ignite0.cluster().active(true);
+        ignite0.cluster().state(ACTIVE);
 
         try {
             startGrid(persistentRegionConfiguration(2));
@@ -536,7 +674,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
 
         ignite0.cluster().baselineAutoAdjustEnabled(false);
 
-        ignite0.cluster().state(ClusterState.ACTIVE);
+        ignite0.cluster().state(ACTIVE);
 
         startGrid(1);