You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/08/05 21:06:05 UTC
[ignite] branch master updated: IGNITE-13302 Fixed partition
divergence in the system cache that may occur due to java thin client
connecting. Fixes #8096
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e7b874d IGNITE-13302 Fixed partition divergence in the system cache that may occur due to java thin client connecting. Fixes #8096
e7b874d is described below
commit e7b874d4bc7cd41f113ace9826dfaf78f759ec5e
Author: mstepachev <ma...@gmail.com>
AuthorDate: Thu Aug 6 00:05:19 2020 +0300
IGNITE-13302 Fixed partition divergence in the system cache that may occur due to java thin client connecting. Fixes #8096
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../processors/cache/GridCacheMapEntry.java | 4 +-
.../main/resources/META-INF/classnames.properties | 1 +
.../SysCacheInconsistencyInternalKeyTest.java | 89 ++++++++++++++++++
.../cache/persistence/db/RebalanceBlockingSPI.java | 102 +++++++++++++++++++++
.../testframework/junits/GridAbstractTest.java | 3 +
.../ignite/testsuites/IgniteCacheTestSuite6.java | 3 +
6 files changed, 200 insertions(+), 2 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 475d1f6..e92c00e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1830,7 +1830,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
topVer);
}
- deferred = cctx.deferredDelete() && !detached() && !isInternal();
+ deferred = cctx.deferredDelete() && !detached();
if (intercept)
entry0.updateCounter(updateCntr0);
@@ -2770,7 +2770,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else {
- if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
+ if (cctx.deferredDelete() && !isStartVersion() && !detached()) {
if (!deletedUnlocked()) {
update(null, 0L, 0L, ver, true);
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 76cc4d6..d4f4f01 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2456,3 +2456,4 @@ org.apache.ignite.transactions.TransactionState
org.apache.ignite.transactions.TransactionTimeoutException
org.apache.ignite.transactions.TransactionUnsupportedConcurrencyException
org.apache.ignite.util.AttributeNodeFilter
+org.apache.ignite.internal.processors.cache.SysCacheInconsistencyInternalKeyTest$KeyUtility
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SysCacheInconsistencyInternalKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SysCacheInconsistencyInternalKeyTest.java
new file mode 100644
index 0000000..9952d2a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SysCacheInconsistencyInternalKeyTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.GridTaskNameHashKey;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.db.RebalanceBlockingSPI;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ *
+ */
+public class SysCacheInconsistencyInternalKeyTest extends GridCommonAbstractTest {
+ /** Slow rebalance cache name. */
+ private static final String SLOW_REBALANCE_CACHE = UTILITY_CACHE_NAME;
+
+ /** Supply message latch. */
+ private static final AtomicReference<CountDownLatch> SUPPLY_MESSAGE_LATCH = new AtomicReference<>();
+
+ /** Supply send latch. */
+ private static final AtomicReference<CountDownLatch> SUPPLY_SEND_LATCH = new AtomicReference<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+
+ cfg.setCommunicationSpi(new RebalanceBlockingSPI(SUPPLY_MESSAGE_LATCH, SLOW_REBALANCE_CACHE, SUPPLY_SEND_LATCH));
+
+ return cfg;
+ }
+
+ /**
+ * Checks that {@link GridCacheInternal} must be added to delete queue.
+ */
+ @Test
+ public void restartLeadToProblemWithDeletedQueue() throws Exception {
+ IgniteEx node1 = startGrid(0);
+ IgniteInternalCache<Object, Object> utilityCache = node1.context().cache().utilityCache();
+
+ for (int i = 0; i < 1000; i++)
+ utilityCache.putAsync(new GridTaskNameHashKey(i), "Obj").get();
+
+ CountDownLatch stopRebalanceLatch = new CountDownLatch(1);
+
+ CountDownLatch readyToSndLatch = new CountDownLatch(1);
+
+ SUPPLY_SEND_LATCH.set(readyToSndLatch);
+
+ SUPPLY_MESSAGE_LATCH.set(stopRebalanceLatch);
+
+ runAsync(() -> startGrid(1));
+
+ readyToSndLatch.await();
+
+ for (int i = 0; i < 1000; i++)
+ utilityCache.remove(new GridTaskNameHashKey(i));
+
+ stopRebalanceLatch.countDown();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ assertFalse(idleVerify(node1, UTILITY_CACHE_NAME).hasConflicts());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/RebalanceBlockingSPI.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/RebalanceBlockingSPI.java
new file mode 100644
index 0000000..19424f5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/RebalanceBlockingSPI.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Test implementation for blocking rebalance process.
+ */
+public class RebalanceBlockingSPI extends TcpCommunicationSpi {
+ /** Supply message latch. */
+ private final AtomicReference<CountDownLatch> supplyMsgLatch;
+
+ /** Slow rebalance cache name. */
+ private final String cacheName;
+
+ /** Supply message latch. */
+ private final AtomicReference<CountDownLatch> supplyMsgSndLatch;
+
+ /**
+ * @param supplyMsgLatch Supply message latch.
+ * @param cacheName Cache name.
+ * @param supplyMsgSndLatch Supply message sender latch.
+ */
+ public RebalanceBlockingSPI(
+ AtomicReference<CountDownLatch> supplyMsgLatch,
+ String cacheName,
+ AtomicReference<CountDownLatch> supplyMsgSndLatch
+ ) {
+ this.supplyMsgLatch = supplyMsgLatch;
+ this.cacheName = cacheName;
+ this.supplyMsgSndLatch = supplyMsgSndLatch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ processMessage(msg);
+
+ super.sendMessage(node, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ ClusterNode node,
+ Message msg,
+ IgniteInClosure<IgniteException> ackC
+ ) throws IgniteSpiException {
+ processMessage(msg);
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processMessage(Message msg) {
+ if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+ int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+ if (grpId == CU.cacheId(cacheName)) {
+ CountDownLatch latch0 = supplyMsgLatch.get();
+
+ Optional.ofNullable(supplyMsgSndLatch.get()).ifPresent(CountDownLatch::countDown);
+
+ if (latch0 != null)
+ try {
+ latch0.await();
+ }
+ catch (InterruptedException ex) {
+ throw new IgniteException(ex);
+ }
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 978f582..0280e9f 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1738,6 +1738,9 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
}
/**
+ * Returns a new instance of ignite configuration.
+ * Be aware that this method is not called by {@link #startGrid(int)}.
+ *
* @return Grid test configuration.
* @throws Exception If failed.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 44c767c..68dca73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.PartitionsExchangeCoordinator
import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest;
import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
+import org.apache.ignite.internal.processors.cache.SysCacheInconsistencyInternalKeyTest;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerDiscoHistoryTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
@@ -156,6 +157,8 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite, PartitionsExchangeAwareTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, SysCacheInconsistencyInternalKeyTest.class, ignoredTests);
+
return suite;
}
}