You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2020/05/27 12:16:20 UTC
[ignite] 01/01: IGNITE-13082 Fix deadlock between topology update
and CQ registration.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-13082
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 1f023f3b0c304c290d76ceab5f99b503d96779c2
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Oct 28 15:16:19 2019 +0300
IGNITE-13082 Fix deadlock between topology update and CQ registration.
---
.../continuous/CacheContinuousQueryHandler.java | 14 +++++++++++
.../continuous/CacheContinuousQueryListener.java | 12 +++++++++-
.../continuous/CacheContinuousQueryManager.java | 28 +++++++++++++++-------
3 files changed, 44 insertions(+), 10 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 970116c..cc672c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -392,6 +392,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
+ @Override public void onBeforeRegister() {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ if (cctx != null && !cctx.isLocal())
+ cctx.topology().readLock();
+ }
+
+ @Override public void onAfterRegister() {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ if (cctx != null && !cctx.isLocal())
+ cctx.topology().readUnlock();
+ }
+
@Override public void onRegister() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index e534fdd..0c13672 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -41,8 +41,18 @@ public interface CacheContinuousQueryListener<K, V> {
boolean recordIgniteEvt, @Nullable GridDhtAtomicAbstractUpdateFuture fut);
/**
+ *
+ */
+ public void onBeforeRegister();
+
+ /**
+ *
+ */
+ public void onAfterRegister();
+
+ /**
* Listener registration callback.
- * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock} write lock held.
+ * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock()}} write lock held.
*/
public void onRegister();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index ecb32ea..5ce9945 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -57,6 +57,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -982,24 +983,33 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
intLsnrCnt.incrementAndGet();
}
else {
- cctx.group().listenerLock().writeLock().lock();
+ lsnr.onBeforeRegister();
try {
- added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
+ CacheGroupContext grp = cctx.group();
+
+ grp.listenerLock().writeLock().lock();
+
+ try {
+ added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
- if (added) {
- lsnrCnt.incrementAndGet();
+ if (added) {
+ lsnrCnt.incrementAndGet();
- lsnr.onRegister();
+ lsnr.onRegister();
- if (lsnrCnt.get() == 1) {
- if (cctx.group().sharedGroup() && !cctx.isLocal())
- cctx.group().addCacheWithContinuousQuery(cctx);
+ if (lsnrCnt.get() == 1) {
+ if (grp.sharedGroup() && !cctx.isLocal())
+ grp.addCacheWithContinuousQuery(cctx);
+ }
}
}
+ finally {
+ grp.listenerLock().writeLock().unlock();
+ }
}
finally {
- cctx.group().listenerLock().writeLock().unlock();
+ lsnr.onAfterRegister();
}
}