You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2020/05/27 12:16:20 UTC

[ignite] 01/01: IGNITE-13082 Fix deadlock between topology update and CQ registration.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-13082
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 1f023f3b0c304c290d76ceab5f99b503d96779c2
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Oct 28 15:16:19 2019 +0300

    IGNITE-13082 Fix deadlock between topology update and CQ registration.
---
 .../continuous/CacheContinuousQueryHandler.java    | 14 +++++++++++
 .../continuous/CacheContinuousQueryListener.java   | 12 +++++++++-
 .../continuous/CacheContinuousQueryManager.java    | 28 +++++++++++++++-------
 3 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 970116c..cc672c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -392,6 +392,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
 
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
+            @Override public void onBeforeRegister() {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx != null && !cctx.isLocal())
+                    cctx.topology().readLock();
+            }
+
+            @Override public void onAfterRegister() {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx != null && !cctx.isLocal())
+                    cctx.topology().readUnlock();
+            }
+
             @Override public void onRegister() {
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index e534fdd..0c13672 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -41,8 +41,18 @@ public interface CacheContinuousQueryListener<K, V> {
         boolean recordIgniteEvt, @Nullable GridDhtAtomicAbstractUpdateFuture fut);
 
     /**
+     *
+     */
+    public void onBeforeRegister();
+
+    /**
+     *
+     */
+    public void onAfterRegister();
+
+    /**
      * Listener registration callback.
-     * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock} write lock held.
+     * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock()}} write lock held.
      */
     public void onRegister();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index ecb32ea..5ce9945 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -57,6 +57,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -982,24 +983,33 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
                 intLsnrCnt.incrementAndGet();
         }
         else {
-            cctx.group().listenerLock().writeLock().lock();
+            lsnr.onBeforeRegister();
 
             try {
-                added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
+                CacheGroupContext grp = cctx.group();
+
+                grp.listenerLock().writeLock().lock();
+
+                try {
+                    added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
 
-                if (added) {
-                    lsnrCnt.incrementAndGet();
+                    if (added) {
+                        lsnrCnt.incrementAndGet();
 
-                    lsnr.onRegister();
+                        lsnr.onRegister();
 
-                    if (lsnrCnt.get() == 1) {
-                        if (cctx.group().sharedGroup() && !cctx.isLocal())
-                            cctx.group().addCacheWithContinuousQuery(cctx);
+                        if (lsnrCnt.get() == 1) {
+                            if (grp.sharedGroup() && !cctx.isLocal())
+                                grp.addCacheWithContinuousQuery(cctx);
+                        }
                     }
                 }
+                finally {
+                    grp.listenerLock().writeLock().unlock();
+                }
             }
             finally {
-                cctx.group().listenerLock().writeLock().unlock();
+                lsnr.onAfterRegister();
             }
         }