You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/03/25 09:34:48 UTC

[ignite] branch master updated: IGNITE-16685 Fix expiration failure on cluster re-activation - Fixes #9888.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d3e5d  IGNITE-16685 Fix expiration failure on cluster re-activation - Fixes #9888.
c4d3e5d is described below

commit c4d3e5dde97fd994509dbf1ce6a3ddd3491f4a19
Author: vladsz83 <vl...@gmail.com>
AuthorDate: Fri Mar 25 12:32:45 2022 +0300

    IGNITE-16685 Fix expiration failure on cluster re-activation - Fixes #9888.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../cache/persistence/GridCacheOffheapManager.java |   4 +-
 .../expiry/ActivationOnExpirationTimeoutTest.java  | 148 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsMvccTestSuite4.java |   2 +
 .../ignite/testsuites/IgnitePdsTestSuite8.java     |   2 +
 4 files changed, 154 insertions(+), 2 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index eb6d0e0..9a64f11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -3095,7 +3095,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                 part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false);
 
                 // Skip non-owned partitions.
-                if (part == null || part.state() != OWNING)
+                if (part == null || part.state() != OWNING || !cctx.topology().initialized())
                     return 0;
             }
 
@@ -3106,7 +3106,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                     return 0;
 
                 try {
-                    if (part != null && part.state() != OWNING)
+                    if (part == null || part.state() != OWNING || !cctx.topology().initialized())
                         return 0;
 
                     long now = U.currentTimeMillis();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/ActivationOnExpirationTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/ActivationOnExpirationTimeoutTest.java
new file mode 100644
index 0000000..608538f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/ActivationOnExpirationTimeoutTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests cluster activation with expired records.
+ */
+public class ActivationOnExpirationTimeoutTest extends GridCommonAbstractTest {
+    /** */
+    private static final int TIMEOUT_MILLS = 500;
+
+    /** */
+    public volatile boolean delayTopologyUpdate;
+
+    /** */
+    public volatile boolean nodeFailed;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        CacheConfiguration<UUID, UUID> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+        cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, TIMEOUT_MILLS)));
+        cacheCfg.setBackups(2);
+
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TcpCommunicationSpi() {
+                @Override public void sendMessage(
+                    ClusterNode node,
+                    Message msg,
+                    IgniteInClosure<IgniteException> ackC
+                ) throws IgniteSpiException {
+                    if (delayTopologyUpdate && ((GridIoMessage)msg).message() instanceof GridDhtPartitionsFullMessage) {
+                        try {
+                            U.sleep(TIMEOUT_MILLS);
+                        }
+                        catch (IgniteInterruptedCheckedException ignore) {
+                            // No-op.
+                        }
+                    }
+
+                    super.sendMessage(node, msg, ackC);
+                }
+            })
+            .setFailureHandler(new FailureHandler() {
+                @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) {
+                    nodeFailed = true;
+
+                    return false;
+                }
+            })
+            .setCacheConfiguration(cacheCfg)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
+    }
+
+    /** Tests cluster activation with expired records. */
+    @Test
+    public void expirePolicyTest() throws Exception {
+        IgniteEx igniteEx = startGrids(2);
+
+        igniteEx.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Object, Object> cache = igniteEx.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 3_000; ++i) {
+            UUID uuid = UUID.randomUUID();
+
+            cache.put(uuid, uuid);
+        }
+
+        igniteEx.cluster().state(ClusterState.INACTIVE);
+
+        // For the rebalance.
+        startGrid(2);
+
+        // Wait for the expiration.
+        U.sleep(TIMEOUT_MILLS);
+
+        delayTopologyUpdate = true;
+
+        igniteEx.cluster().state(ClusterState.ACTIVE);
+
+        assertFalse(nodeFailed);
+        assertEquals(ClusterState.ACTIVE, G.allGrids().get(0).cluster().state());
+        assertEquals(ClusterState.ACTIVE, G.allGrids().get(1).cluster().state());
+        assertEquals(ClusterState.ACTIVE, G.allGrids().get(2).cluster().state());
+
+        GridTestUtils.waitForCondition(() -> cache.size() == 0, TIMEOUT_MILLS);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
index 7572287..8656942 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.expiry.ActivationOnExpirationTimeoutTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheEntriesExpirationTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationEncryptionTest;
@@ -72,6 +73,7 @@ public class IgnitePdsMvccTestSuite4 {
         ignoredTests.add(OffHeapLockLogTest.class);
         ignoredTests.add(OffHeapLockStackTest.class);
         ignoredTests.add(IgnitePdsCacheEntriesExpirationTest.class);
+        ignoredTests.add(ActivationOnExpirationTimeoutTest.class);
 
         // Defragmentation.
         ignoredTests.add(IgnitePdsDefragmentationTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java
index b9438d1..c8a8ae6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.ignite.cache.CircledRebalanceTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.CacheRebalanceWithRemovedWalSegment;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.SupplyPartitionHistoricallyWithReorderedUpdates;
+import org.apache.ignite.internal.processors.cache.expiry.ActivationOnExpirationTimeoutTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheEntriesExpirationTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsConsistencyOnDelayedPartitionOwning;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationEncryptionTest;
@@ -83,6 +84,7 @@ public class IgnitePdsTestSuite8 {
         GridTestUtils.addTestIfNeeded(suite, PageLockTrackerResourcesTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheEntriesExpirationTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, ActivationOnExpirationTimeoutTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsConsistencyOnDelayedPartitionOwning.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, SupplyPartitionHistoricallyWithReorderedUpdates.class, ignoredTests);