You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/08/05 21:06:05 UTC

[ignite] branch master updated: IGNITE-13302 Fixed partition divergence in the system cache that may occur due to java thin client connecting. Fixes #8096

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e7b874d  IGNITE-13302 Fixed partition divergence in the system cache that may occur due to java thin client connecting. Fixes #8096
e7b874d is described below

commit e7b874d4bc7cd41f113ace9826dfaf78f759ec5e
Author: mstepachev <ma...@gmail.com>
AuthorDate: Thu Aug 6 00:05:19 2020 +0300

    IGNITE-13302 Fixed partition divergence in the system cache that may occur due to java thin client connecting. Fixes #8096
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../processors/cache/GridCacheMapEntry.java        |   4 +-
 .../main/resources/META-INF/classnames.properties  |   1 +
 .../SysCacheInconsistencyInternalKeyTest.java      |  89 ++++++++++++++++++
 .../cache/persistence/db/RebalanceBlockingSPI.java | 102 +++++++++++++++++++++
 .../testframework/junits/GridAbstractTest.java     |   3 +
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   3 +
 6 files changed, 200 insertions(+), 2 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 475d1f6..e92c00e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1830,7 +1830,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     topVer);
             }
 
-            deferred = cctx.deferredDelete() && !detached() && !isInternal();
+            deferred = cctx.deferredDelete() && !detached();
 
             if (intercept)
                 entry0.updateCounter(updateCntr0);
@@ -2770,7 +2770,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
             else {
-                if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
+                if (cctx.deferredDelete() && !isStartVersion() && !detached()) {
                     if (!deletedUnlocked()) {
                         update(null, 0L, 0L, ver, true);
 
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 76cc4d6..d4f4f01 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2456,3 +2456,4 @@ org.apache.ignite.transactions.TransactionState
 org.apache.ignite.transactions.TransactionTimeoutException
 org.apache.ignite.transactions.TransactionUnsupportedConcurrencyException
 org.apache.ignite.util.AttributeNodeFilter
+org.apache.ignite.internal.processors.cache.SysCacheInconsistencyInternalKeyTest$KeyUtility
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SysCacheInconsistencyInternalKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SysCacheInconsistencyInternalKeyTest.java
new file mode 100644
index 0000000..9952d2a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SysCacheInconsistencyInternalKeyTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.GridTaskNameHashKey;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.db.RebalanceBlockingSPI;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ *
+ */
+public class SysCacheInconsistencyInternalKeyTest extends GridCommonAbstractTest {
+    /** Slow rebalance cache name. */
+    private static final String SLOW_REBALANCE_CACHE = UTILITY_CACHE_NAME;
+
+    /** Supply message latch. */
+    private static final AtomicReference<CountDownLatch> SUPPLY_MESSAGE_LATCH = new AtomicReference<>();
+
+    /** Supply send latch. */
+    private static final AtomicReference<CountDownLatch> SUPPLY_SEND_LATCH = new AtomicReference<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        cfg.setCommunicationSpi(new RebalanceBlockingSPI(SUPPLY_MESSAGE_LATCH, SLOW_REBALANCE_CACHE, SUPPLY_SEND_LATCH));
+
+        return cfg;
+    }
+
+    /**
+     * Checks that {@link GridCacheInternal} must be added to delete queue.
+     */
+    @Test
+    public void restartLeadToProblemWithDeletedQueue() throws Exception {
+        IgniteEx node1 = startGrid(0);
+        IgniteInternalCache<Object, Object> utilityCache = node1.context().cache().utilityCache();
+
+        for (int i = 0; i < 1000; i++)
+            utilityCache.putAsync(new GridTaskNameHashKey(i), "Obj").get();
+
+        CountDownLatch stopRebalanceLatch = new CountDownLatch(1);
+
+        CountDownLatch readyToSndLatch = new CountDownLatch(1);
+
+        SUPPLY_SEND_LATCH.set(readyToSndLatch);
+
+        SUPPLY_MESSAGE_LATCH.set(stopRebalanceLatch);
+
+        runAsync(() -> startGrid(1));
+
+        readyToSndLatch.await();
+
+        for (int i = 0; i < 1000; i++)
+            utilityCache.remove(new GridTaskNameHashKey(i));
+
+        stopRebalanceLatch.countDown();
+
+        awaitPartitionMapExchange(true, true, null);
+
+        assertFalse(idleVerify(node1, UTILITY_CACHE_NAME).hasConflicts());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/RebalanceBlockingSPI.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/RebalanceBlockingSPI.java
new file mode 100644
index 0000000..19424f5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/RebalanceBlockingSPI.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Test implementation for blocking rebalance process.
+ */
+public class RebalanceBlockingSPI extends TcpCommunicationSpi {
+    /** Supply message latch. */
+    private final AtomicReference<CountDownLatch> supplyMsgLatch;
+
+    /** Slow rebalance cache name. */
+    private final String cacheName;
+
+    /** Supply message latch. */
+    private final AtomicReference<CountDownLatch> supplyMsgSndLatch;
+
+    /**
+     * @param supplyMsgLatch Supply message latch.
+     * @param cacheName Cache name.
+     * @param supplyMsgSndLatch Supply message sender latch.
+     */
+    public RebalanceBlockingSPI(
+        AtomicReference<CountDownLatch> supplyMsgLatch,
+        String cacheName,
+        AtomicReference<CountDownLatch> supplyMsgSndLatch
+    ) {
+        this.supplyMsgLatch = supplyMsgLatch;
+        this.cacheName = cacheName;
+        this.supplyMsgSndLatch = supplyMsgSndLatch;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+        processMessage(msg);
+
+        super.sendMessage(node, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendMessage(
+        ClusterNode node,
+        Message msg,
+        IgniteInClosure<IgniteException> ackC
+    ) throws IgniteSpiException {
+        processMessage(msg);
+
+        super.sendMessage(node, msg, ackC);
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processMessage(Message msg) {
+        if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+            int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+            if (grpId == CU.cacheId(cacheName)) {
+                CountDownLatch latch0 = supplyMsgLatch.get();
+
+                Optional.ofNullable(supplyMsgSndLatch.get()).ifPresent(CountDownLatch::countDown);
+
+                if (latch0 != null)
+                    try {
+                        latch0.await();
+                    }
+                    catch (InterruptedException ex) {
+                        throw new IgniteException(ex);
+                    }
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 978f582..0280e9f 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1738,6 +1738,9 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
     }
 
     /**
+     * Returns a new instance of ignite configuration.
+     * Be aware that this method is not called by {@link #startGrid(int)}.
+     *
      * @return Grid test configuration.
      * @throws Exception If failed.
      */
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 44c767c..68dca73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.PartitionsExchangeCoordinator
 import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest;
 import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
 import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
+import org.apache.ignite.internal.processors.cache.SysCacheInconsistencyInternalKeyTest;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerDiscoHistoryTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
@@ -156,6 +157,8 @@ public class IgniteCacheTestSuite6 {
 
         GridTestUtils.addTestIfNeeded(suite, PartitionsExchangeAwareTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, SysCacheInconsistencyInternalKeyTest.class, ignoredTests);
+
         return suite;
     }
 }