You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/02/12 17:43:46 UTC

[ignite] branch master updated: IGNITE-12654 Some of rentingFutures in GridDhtPartitionTopologyImpl may accumulate a huge number of eviction callbacks - Fixes #7399.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e17887b  IGNITE-12654 Some of rentingFutures in GridDhtPartitionTopologyImpl may accumulate a huge number of eviction callbacks - Fixes #7399.
e17887b is described below

commit e17887bfbff7ddf8d58b9c376acca0382a184c15
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Wed Feb 12 20:43:26 2020 +0300

    IGNITE-12654 Some of rentingFutures in GridDhtPartitionTopologyImpl may accumulate a huge number of eviction callbacks - Fixes #7399.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../dht/topology/GridDhtLocalPartition.java        |  16 ++-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |  28 +++--
 ...eScheduleResendPartitionsAfterEvictionTest.java | 135 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite7.java   |   2 +
 4 files changed, 171 insertions(+), 10 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index ff0b0f2..13b7811 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -653,12 +653,26 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return Future to signal that this node is no longer an owner or backup.
      */
     public IgniteInternalFuture<?> rent(boolean updateSeq) {
+        return rent(updateSeq, true);
+    }
+
+    /**
+     * Initiates partition eviction process.
+     *
+     * If partition has reservations, eviction will be delayed and continued after all reservations will be released.
+     *
+     * @param updateSeq If {@code true} topology update sequence will be updated after eviction is finished.
+     * @param alwaysReturnRentingFut If {@code true} renting future is returned in any way.
+     * @return Future to signal that this node is no longer an owner or backup or null if corresponding partition
+     * state is {@code RENTING} or {@code EVICTED}.
+     */
+    public IgniteInternalFuture<?> rent(boolean updateSeq, boolean alwaysReturnRentingFut) {
         long state0 = this.state.get();
 
         GridDhtPartitionState partState = getPartState(state0);
 
         if (partState == RENTING || partState == EVICTED)
-            return rent;
+            return alwaysReturnRentingFut ? rent : null;
 
         delayedRentingTopVer = ctx.exchange().readyAffinityVersion().topologyVersion();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 4156878..d1eae4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -2436,6 +2437,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      * @return {@code True} if there are local partitions need to be evicted.
      */
+    @SuppressWarnings("unchecked")
     private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
         if (!ctx.kernalContext().state().evictionsAllowed())
             return false;
@@ -2465,9 +2467,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (nodeIds.containsAll(F.nodeIds(affNodes))) {
                 GridDhtPartitionState state0 = part.state();
 
-                IgniteInternalFuture<?> rentFut = part.rent(false);
+                // There is no need to track a renting future of a partition which is already renting/evicted.
+                IgniteInternalFuture<?> rentFut = part.rent(false, false);
 
-                rentingFutures.add(rentFut);
+                if (rentFut != null)
+                    rentingFutures.add(rentFut);
 
                 updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion());
 
@@ -2496,9 +2500,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (locId.equals(n.id())) {
                             GridDhtPartitionState state0 = part.state();
 
-                            IgniteInternalFuture<?> rentFut = part.rent(false);
+                            // There is no need to track a renting future of a partition
+                            // which is already renting/evicted.
+                            IgniteInternalFuture<?> rentFut = part.rent(false, false);
 
-                            rentingFutures.add(rentFut);
+                            if (rentFut != null)
+                                rentingFutures.add(rentFut);
 
                             updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion());
 
@@ -2522,15 +2529,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (!rentingFutures.isEmpty()) {
             final AtomicInteger rentingPartitions = new AtomicInteger(rentingFutures.size());
 
-            for (IgniteInternalFuture<?> rentingFuture : rentingFutures) {
-                rentingFuture.listen(f -> {
+            IgniteInClosure c = new IgniteInClosure() {
+                @Override public void apply(Object o) {
                     int remaining = rentingPartitions.decrementAndGet();
 
                     if (remaining == 0) {
                         lock.writeLock().lock();
 
                         try {
-                            this.updateSeq.incrementAndGet();
+                            GridDhtPartitionTopologyImpl.this.updateSeq.incrementAndGet();
 
                             if (log.isDebugEnabled())
                                 log.debug("Partitions have been scheduled to resend [reason=" +
@@ -2542,8 +2549,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             lock.writeLock().unlock();
                         }
                     }
-                });
-            }
+                }
+            };
+
+            for (IgniteInternalFuture<?> rentingFuture : rentingFutures)
+                rentingFuture.listen(c);
         }
 
         return hasEvictedPartitions;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheScheduleResendPartitionsAfterEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheScheduleResendPartitionsAfterEvictionTest.java
new file mode 100644
index 0000000..009ab31
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheScheduleResendPartitionsAfterEvictionTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManagerAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/**
+ *
+ */
+public class GridCacheScheduleResendPartitionsAfterEvictionTest extends PartitionsEvictManagerAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(false)));
+
+        CacheConfiguration cc = new CacheConfiguration(DEFAULT_CACHE_NAME).setBackups(1);
+
+        cfg.setCacheConfiguration(cc);
+
+        return cfg;
+    }
+
+    /**
+     * Check that listeners that schedule resend partitions after eviction doesn't added to renting future
+     * uncontrollably. At most one listener is expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testRentingFuturesListenersNotGrowingUncontrollably() throws Exception {
+        IgniteEx node1 = startGrid(0);
+
+        startGrid(1);
+
+        node1.cluster().baselineAutoAdjustEnabled(false);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        subscribeEvictionQueueAtLatch(node1, latch, false);
+
+        node1.cluster().state(ACTIVE);
+
+        node1.getOrCreateCache(DEFAULT_CACHE_NAME);
+        try (IgniteDataStreamer streamer = node1.dataStreamer(DEFAULT_CACHE_NAME)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < 100_000; i++)
+                streamer.addData(i, i);
+        }
+
+        final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+        IgniteInternalFuture res = GridTestUtils.runMultiThreadedAsync(() -> {
+                try {
+                    startGrid(nodeIdx.incrementAndGet());
+                    resetBaselineTopology();
+                }
+                catch (Exception e) {
+                    fail("Failed to start rebalance.");
+                }
+            },
+            3,
+            "rebalanceThread");
+
+        // Give some time for rebalance to start.
+        Thread.sleep(3000);
+
+        latch.countDown();
+
+        // And some extra time for collecting callbacks.
+        Thread.sleep(100);
+
+        CacheGroupContext grpCtx = node1.context().cache().cacheGroup(CU.cacheId(DEFAULT_CACHE_NAME));
+
+        assertNotNull(grpCtx);
+
+        GridDhtPartitionTopologyImpl top = (GridDhtPartitionTopologyImpl)grpCtx.topology();
+
+        List<GridDhtLocalPartition> locParts = top.localPartitions();
+
+        for (GridDhtLocalPartition localPartition : locParts) {
+            GridFutureAdapter partRentFut = IgniteUtils.field(localPartition, "rent");
+
+            int lsnrCnt = 0;
+            for (Object waitNode = IgniteUtils.field(partRentFut, "state");
+                waitNode != null; waitNode = IgniteUtils.field(waitNode, "next")) {
+                if (IgniteUtils.field(waitNode, "val") != null)
+                    lsnrCnt++;
+            }
+
+            // At most one listener is expected.
+            assertTrue(lsnrCnt <= 1);
+        }
+
+        res.get();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 0bfbc4b..1cdd5fc 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartition
 import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest;
 import org.apache.ignite.internal.processors.cache.distributed.CachePartitionLostWhileClearingTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheScheduleResendPartitionsAfterEvictionTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest;
@@ -97,6 +98,7 @@ public class IgniteCacheTestSuite7 {
         GridTestUtils.addTestIfNeeded(suite, Cache64kPartitionsTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingPartitionCountersTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingWithAsyncClearingTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, GridCacheScheduleResendPartitionsAfterEvictionTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheAssignmentNodeRestartsTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, TxRollbackAsyncWithPersistenceTest.class, ignoredTests);