You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/04/02 08:00:21 UTC

[ignite] branch master updated: IGNITE-14375 Pending messages can be erroneously send (#8943)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8113ec0  IGNITE-14375 Pending messages can be erroneously send (#8943)
8113ec0 is described below

commit 8113ec0c6edfea35e57cfe17333041ba34ccc108
Author: Evgeniy Stanilovskiy <st...@gmail.com>
AuthorDate: Fri Apr 2 11:00:06 2021 +0300

    IGNITE-14375 Pending messages can be erroneously send (#8943)
---
 .../cache/CacheAffinitySharedManager.java          |  4 +-
 .../cache/DynamicCacheChangeRequest.java           |  2 +-
 .../internal/processors/cache/ExchangeActions.java |  4 +-
 .../cache/GatewayProtectedCacheProxy.java          |  2 +-
 .../processors/cache/GridCacheMapEntry.java        | 12 ++--
 .../cache/GridCachePartitionExchangeManager.java   |  2 +-
 .../processors/cache/GridCacheProcessor.java       |  2 +-
 .../cache/transactions/IgniteTxManager.java        |  3 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 65 +++++++++++-----------
 .../cache/CacheSerializableTransactionsTest.java   | 15 ++---
 .../junits/common/GridCommonAbstractTest.java      |  8 +--
 11 files changed, 58 insertions(+), 61 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f0f4491..ff906be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -639,12 +639,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * @param msg Change request.
      * @param topVer Current topology version.
-     * @param crd Coordinator flag.
      * @return Closed caches IDs.
      */
     private Set<Integer> processCacheCloseRequests(
         ClientCacheChangeDummyDiscoveryMessage msg,
-        boolean crd,
         AffinityTopologyVersion topVer
     ) {
         Set<String> cachesToClose = msg.cachesToClose();
@@ -706,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         // Check and close caches via dummy message.
         if (msg.cachesToClose() != null)
-            closedCaches = processCacheCloseRequests(msg, crd, topVer);
+            closedCaches = processCacheCloseRequests(msg, topVer);
 
         // Shedule change message.
         if (startedCaches != null || closedCaches != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 7f71c82..88d44cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -502,7 +502,7 @@ public class DynamicCacheChangeRequest implements Serializable {
             ", clientStartOnly=" + clientStartOnly +
             ", stop=" + stop +
             ", destroy=" + destroy +
-            ", disabledAfterStart" + disabledAfterStart +
+            ", disabledAfterStart=" + disabledAfterStart +
             ']';
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index b31c6f0..cbe7df4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -93,14 +93,14 @@ public class ExchangeActions {
      * @return New caches start requests.
      */
     public Collection<CacheActionData> cacheStartRequests() {
-        return cachesToStart != null ? cachesToStart.values() : Collections.<CacheActionData>emptyList();
+        return cachesToStart != null ? cachesToStart.values() : Collections.emptyList();
     }
 
     /**
      * @return Stop cache requests.
      */
     public Collection<CacheActionData> cacheStopRequests() {
-        return cachesToStop != null ? cachesToStop.values() : Collections.<CacheActionData>emptyList();
+        return cachesToStop != null ? cachesToStop.values() : Collections.emptyList();
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
index 1b9e610..4c361d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
@@ -1599,7 +1599,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
             IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate;
 
             try {
-                IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().<K, V>publicJCache(context().name());
+                IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().publicJCache(context().name());
 
                 if (proxy != null) {
                     proxyImpl.opportunisticRestart(proxy.internalProxy());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b0221a7..2fde831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1577,13 +1577,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             recordNodeId(affNodeId, topVer);
 
-            if (metrics && cctx.statisticsEnabled()) {
+            if (metrics && cctx.statisticsEnabled() && tx != null) {
                 cctx.cache().metrics0().onWrite();
 
-                T2<GridCacheOperation, CacheObject> entryProcRes = tx.entry(txKey()).entryProcessorCalculatedValue();
+                IgniteTxEntry txEntry = tx.entry(txKey());
 
-                if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
-                    cctx.cache().metrics0().onInvokeUpdate(old != null);
+                if (txEntry != null) {
+                    T2<GridCacheOperation, CacheObject> entryProcRes = txEntry.entryProcessorCalculatedValue();
+
+                    if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
+                        cctx.cache().metrics0().onInvokeUpdate(old != null);
+                }
             }
 
             if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5a80d8e..1f27819 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1048,7 +1048,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);
 
-        assert grpDesc != null : grpId;
+        assert grpDesc != null : "grpId=" + grpId;
 
         CacheConfiguration<?, ?> ccfg = grpDesc.config();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index faa98de..7c1d13d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -287,7 +287,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Transaction interface implementation. */
     private IgniteTransactionsImpl transactions;
 
-    /** Pending cache starts. */
+    /** Pending cache operations. */
     private ConcurrentMap<UUID, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();
 
     /** Template configuration add futures. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fb3a892..53320e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -3202,7 +3202,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     for (Map.Entry<GridCacheMapEntry, Integer> info : store.entrySet()) {
                         GridCacheAdapter<Object, Object> cacheCtx = info.getKey().context().cache();
 
-                        metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info);
+                        if (cacheCtx != null)
+                            metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info);
                     }
 
                     store.clear();
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2b0858c..0c1b185 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2751,14 +2751,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                     PendingMessage pm = new PendingMessage(msg);
 
                     this.msgs.add(pm);
-
-                    if (pm.customMsg && pm.id.equals(customDiscardId))
-                        this.customDiscardId = customDiscardId;
-
-                    if (!pm.customMsg && pm.id.equals(discardId))
-                        this.discardId = discardId;
                 }
             }
+
+            this.discardId = discardId;
+            this.customDiscardId = customDiscardId;
         }
 
         /**
@@ -6227,32 +6224,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) {
             if (isLocalNodeCoordinator()) {
-                boolean delayMsg;
-
-                assert ring.minimumNodeVersion() != null : ring;
-
-                boolean joiningEmpty;
-
-                synchronized (mux) {
-                    joiningEmpty = joiningNodes.isEmpty();
-                }
-
-                delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;
-
-                if (delayMsg) {
-                    if (log.isDebugEnabled()) {
-                        synchronized (mux) {
-                            log.debug("Delay custom message processing, there are joining nodes [msg=" + msg +
-                                ", joiningNodes=" + joiningNodes + ']');
-                        }
-                    }
-
-                    synchronized (mux) {
-                        pendingCustomMsgs.add(msg);
-                    }
-
+                if (posponeUndeliveredMessages(msg))
                     return;
-                }
 
                 if (!msg.verified()) {
                     msg.verify(getLocalNodeId());
@@ -6337,6 +6310,36 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * If new node is in the progress of being added we must store and resend undelivered messages.
+         *
+         * @param msg Processed message.
+         * @return {@code true} If message was appended to pending queue.
+         */
+        private boolean posponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) {
+            boolean joiningEmpty;
+
+            synchronized (mux) {
+                joiningEmpty = joiningNodes.isEmpty();
+
+                if (log.isDebugEnabled())
+                    log.debug("Delay custom message processing, there are joining nodes [msg=" + msg +
+                        ", joiningNodes=" + joiningNodes + ']');
+            }
+
+            boolean delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;
+
+            if (delayMsg) {
+                synchronized (mux) {
+                    pendingCustomMsgs.add(msg);
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
          * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node is still in the
          * ring and node detected failure left ring.
          */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 65859ca..07251ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -74,7 +74,6 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionOptimisticException;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -186,7 +185,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     private void txStreamerLoad(Ignite ignite,
         Integer key,
         String cacheName,
-        boolean allowOverwrite) throws Exception {
+        boolean allowOverwrite) {
         IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
 
         log.info("Test key: " + key);
@@ -2824,7 +2823,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
     @Test
     public void testReadWriteTransactionsNoDeadlock() throws Exception {
         checkReadWriteTransactionsNoDeadlock(false);
@@ -2833,7 +2831,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
     @Test
     public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception {
         checkReadWriteTransactionsNoDeadlock(true);
@@ -2844,8 +2841,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-9226");
-
         final Ignite ignite0 = ignite(0);
 
         for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
@@ -4140,7 +4135,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
 
             if (nonSer) {
                 nonSerFut = runMultiThreadedAsync(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
+                    @Override public Void call() {
                         int nodeIdx = idx.getAndIncrement() % clients.size();
 
                         Ignite node = clients.get(nodeIdx);
@@ -4198,7 +4193,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
             }
 
             final IgniteInternalFuture<?> fut = runMultiThreadedAsync(new Callable<Void>() {
-                @Override public Void call() throws Exception {
+                @Override public Void call() {
                     int nodeIdx = idx.getAndIncrement() % clients.size();
 
                     Ignite node = clients.get(nodeIdx);
@@ -4210,8 +4205,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
                     final IgniteTransactions txs = node.transactions();
 
                     final IgniteCache<Integer, Account> cache =
-                        nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) :
-                            node.<Integer, Account>cache(cacheName);
+                        nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<>()) :
+                            node.cache(cacheName);
 
                     assertNotNull(cache);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 95aea5b..7075635 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1359,10 +1359,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      * @param cnt Keys count.
      * @param startFrom Start value for keys search.
      * @return Collection of keys for which given cache is neither primary nor backup.
-     * @throws IgniteCheckedException If failed.
      */
-    protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom)
-        throws IgniteCheckedException {
+    protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) {
         return findKeys(cache, cnt, startFrom, 2);
     }
 
@@ -1549,10 +1547,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     /**
      * @param cache Cache.
      * @return Key for which given cache is neither primary nor backup.
-     * @throws IgniteCheckedException If failed.
      */
-    protected Integer nearKey(IgniteCache<?, ?> cache)
-        throws IgniteCheckedException {
+    protected Integer nearKey(IgniteCache<?, ?> cache) {
         return nearKeys(cache, 1, 1).get(0);
     }