You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/02/07 12:46:16 UTC

[ignite] branch master updated: IGNITE-10755: MVCC: Fix CQ registration in the middle of tx. This closes #5841.

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 937164e  IGNITE-10755: MVCC: Fix CQ registration in the middle of tx. This closes #5841.
937164e is described below

commit 937164ea1a922ae32e54aa32b7356ef758599e87
Author: rkondakov <ko...@mail.ru>
AuthorDate: Thu Feb 7 15:45:49 2019 +0300

    IGNITE-10755: MVCC: Fix CQ registration in the middle of tx. This closes #5841.
---
 .../ignite/internal/GridEventConsumeHandler.java   |   6 +
 .../ignite/internal/GridMessageListenHandler.java  |   6 +
 .../processors/cache/CacheGroupContext.java        |  73 ++++--
 .../processors/cache/GridCacheContext.java         |  10 +
 .../processors/cache/GridCacheMapEntry.java        |   4 +-
 .../dht/GridDhtTxAbstractEnlistFuture.java         |   2 +-
 .../processors/cache/mvcc/MvccCachingManager.java  | 280 +++++++++++++--------
 .../continuous/CacheContinuousQueryHandler.java    |  58 ++++-
 .../continuous/CacheContinuousQueryListener.java   |   7 +
 .../continuous/CacheContinuousQueryManager.java    |  38 +--
 .../cache/transactions/IgniteTxHandler.java        |   2 +-
 .../IgniteTxImplicitSingleStateImpl.java           |  12 +
 .../transactions/IgniteTxRemoteStateAdapter.java   |  15 +-
 .../cache/transactions/IgniteTxState.java          |   8 +
 .../cache/transactions/IgniteTxStateImpl.java      |  15 +-
 .../processors/cache/transactions/TxCounters.java  |   2 +-
 .../continuous/GridContinuousHandler.java          |   5 +
 .../continuous/GridContinuousProcessor.java        |   3 +-
 .../eviction/paged/PageEvictionMetricTest.java     |   2 +-
 .../PageEvictionPagesRecyclingAndReusingTest.java  |   4 +-
 .../eviction/paged/PageEvictionTouchOrderTest.java |   4 +-
 .../PageEvictionWithRebalanceAbstractTest.java     |   2 +-
 ...ntinuousQueryConcurrentPartitionUpdateTest.java |   5 +-
 .../CacheContinuousQueryExecuteInPrimaryTest.java  |  30 ++-
 24 files changed, 411 insertions(+), 182 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 58d0d9a..e15cdd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
@@ -142,6 +143,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+        return Collections.emptyMap();
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index c146eca..c6a2d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -132,6 +133,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+        return Collections.emptyMap();
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         ctx.io().addUserMessageListener(topic, pred);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 46de040..4cdcf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -114,7 +116,11 @@ public class CacheGroupContext {
     /** We modify content under lock, by making defencive copy, field always contains unmodifiable list. */
     private volatile List<GridCacheContext> caches = Collections.unmodifiableList(new ArrayList<>());
 
-    private volatile List<GridCacheContext> contQryCaches;
+    /** List of caches with registered CQ listeners. */
+    private List<GridCacheContext> contQryCaches;
+
+    /** ReadWriteLock to control the continuous query setup - this is to prevent the race between cache update and listener setup */
+    private final ReentrantReadWriteLock listenerLock = new ReentrantReadWriteLock();
 
     /** */
     private final IgniteLogger log;
@@ -883,17 +889,16 @@ public class CacheGroupContext {
         assert sharedGroup() : cacheOrGroupName();
         assert cctx.group() == this : cctx.name();
         assert !cctx.isLocal() : cctx.name();
+        assert listenerLock.writeLock().isHeldByCurrentThread();
 
-        synchronized (this) {
-            List<GridCacheContext> contQryCaches = this.contQryCaches;
+        List<GridCacheContext> contQryCaches = this.contQryCaches;
 
-            if (contQryCaches == null)
-                contQryCaches = new ArrayList<>();
+        if (contQryCaches == null)
+            contQryCaches = new ArrayList<>();
 
-            contQryCaches.add(cctx);
+        contQryCaches.add(cctx);
 
-            this.contQryCaches = contQryCaches;
-        }
+        this.contQryCaches = contQryCaches;
     }
 
     /**
@@ -903,20 +908,30 @@ public class CacheGroupContext {
         assert sharedGroup() : cacheOrGroupName();
         assert cctx.group() == this : cctx.name();
         assert !cctx.isLocal() : cctx.name();
+        assert listenerLock.isWriteLockedByCurrentThread();
 
-        synchronized (this) {
-            List<GridCacheContext> contQryCaches = this.contQryCaches;
+        List<GridCacheContext> contQryCaches = this.contQryCaches;
 
-            if (contQryCaches == null)
-                return;
+        if (contQryCaches == null)
+            return;
 
-            contQryCaches.remove(cctx);
+        contQryCaches.remove(cctx);
 
-            if (contQryCaches.isEmpty())
-                contQryCaches = null;
+        if (contQryCaches.isEmpty())
+            contQryCaches = null;
 
-            this.contQryCaches = contQryCaches;
-        }
+        this.contQryCaches = contQryCaches;
+    }
+
+
+    /**
+     * Obtain the group listeners lock. Write lock should be held to register/unregister listeners. Read lock should be
+     * hel for CQ listeners notification.
+     *
+     * @return Lock for the CQ listeners.
+     */
+    public ReadWriteLock listenerLock() {
+        return listenerLock;
     }
 
     /**
@@ -935,7 +950,16 @@ public class CacheGroupContext {
         if (isLocal())
             return;
 
-        List<GridCacheContext> contQryCaches = this.contQryCaches;
+        List<GridCacheContext> contQryCaches;
+
+        listenerLock.readLock().lock();
+
+        try {
+            contQryCaches = this.contQryCaches;
+        }
+        finally {
+            listenerLock.readLock().unlock();
+        }
 
         if (contQryCaches == null)
             return;
@@ -965,7 +989,18 @@ public class CacheGroupContext {
      * @return {@code True} if there is at least one cache with registered CQ exists in this group.
      */
     public boolean hasContinuousQueryCaches() {
-        return !F.isEmpty(contQryCaches);
+        List<GridCacheContext> contQryCaches;
+
+        listenerLock.readLock().lock();
+
+        try {
+            contQryCaches = this.contQryCaches;
+
+            return !F.isEmpty(contQryCaches);
+        }
+        finally {
+            listenerLock.readLock().unlock();
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 2bdb275..e0b5ee0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -2321,6 +2322,15 @@ public class GridCacheContext<K, V> implements Externalizable {
             near().dht().context().statisticsEnabled = statisticsEnabled;
     }
 
+    /**
+     * @param tx Transaction.
+     * @return {@code True} if it is need to notify continuous query listeners.
+     */
+    public boolean hasContinuousQueryListeners(@Nullable IgniteInternalTx tx) {
+        return grp.sharedGroup() ? grp.hasContinuousQueryCaches() :
+            contQryMgr.notifyContinuousQueries(tx) && !F.isEmpty(contQryMgr.updateListeners(false, false));
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, igniteInstanceName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 373c4b3..4753e9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -257,7 +257,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         this.key = key;
         this.hash = key.hashCode();
         this.cctx = cctx;
-        this.listenerLock = cctx.continuousQueries().getListenerReadLock();
+        this.listenerLock = cctx.group().listenerLock().readLock();
 
         ver = cctx.shared().versions().startVersion();
     }
@@ -5159,7 +5159,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 valid = entry.valid(tx.topologyVersion());
 
-                boolean needOldVal = cctx.shared().mvccCaching().continuousQueryListeners(cctx, tx, entry.key()) != null;
+                boolean needOldVal = tx.txState().useMvccCaching(cctx.cacheId());
 
                 cctx.shared().database().checkpointReadLock();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index e5adfcb..bc310ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -432,7 +432,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
 
                     assert entryProc != null || !op.isInvoke();
 
-                    boolean needOldVal = cctx.shared().mvccCaching().continuousQueryListeners(cctx, tx, key) != null;
+                    boolean needOldVal = tx.txState().useMvccCaching(cctx.cacheId());
 
                     GridCacheUpdateTxResult res;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
index 8f83b6e..06dfe01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -52,15 +54,14 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
 
 /**
- * Manager for caching MVCC transaction updates.
- * This updates can be used further in CQ, DR and other places.
+ * Manager for caching MVCC transaction updates. This updates can be used further in CQ, DR and other places.
  */
 public class MvccCachingManager extends GridCacheSharedManagerAdapter {
     /** Maximum possible transaction size when caching is enabled. */
     public static final int TX_SIZE_THRESHOLD = IgniteSystemProperties.getInteger(IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD,
         20_000);
 
-    /** Cached enlist values*/
+    /** Cached enlist values. */
     private final Map<GridCacheVersion, EnlistBuffer> enlistCache = new ConcurrentHashMap<>();
 
     /** Counters map. Used for OOM prevention caused by the big transactions. */
@@ -68,6 +69,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
 
     /**
      * Adds enlisted tx entry to cache.
+     *
      * @param key Key.
      * @param val Value.
      * @param ttl Time to live.
@@ -102,15 +104,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
 
         if (log.isDebugEnabled()) {
             log.debug("Added entry to mvcc cache: [key=" + key + ", val=" + val + ", oldVal=" + oldVal +
-                ", primary=" + primary + ", mvccVer=" + mvccVer + ", cacheId=" + cacheId + ", ver=" + ver +']');
+                ", primary=" + primary + ", mvccVer=" + mvccVer + ", cacheId=" + cacheId + ", ver=" + ver + ']');
         }
 
-        GridCacheContext ctx0 = cctx.cacheContext(cacheId);
-
-        // Do not cache updates if there is no DR or CQ enabled.
-        if (!needDrReplicate(ctx0, key) &&
-            F.isEmpty(continuousQueryListeners(ctx0, tx, key)) &&
-            !ctx0.group().hasContinuousQueryCaches())
+        // Do not cache updates if there are no DR or CQ were enabled when cache was added as active for the current tx.
+        if (!tx.txState().useMvccCaching(cacheId))
             return;
 
         AtomicInteger cntr = cntrs.computeIfAbsent(new TxKey(mvccVer.coordinatorVersion(), mvccVer.counter()),
@@ -124,17 +122,16 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
 
         EnlistBuffer cached = enlistCache.computeIfAbsent(ver, v -> new EnlistBuffer());
 
-        cached.add(primary ? null : futId, primary ? -1 : batchNum, key, e);
+        cached.add(primary ? null : futId, primary ? -1 : batchNum, e);
     }
 
     /**
-     *
      * @param tx Transaction.
      * @param commit {@code True} if commit.
      */
     public void onTxFinished(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
         if (log.isDebugEnabled())
-            log.debug("Transaction finished: [commit=" + commit + ", tx=" + tx  + ']');
+            log.debug("Transaction finished: [commit=" + commit + ", tx=" + tx + ']');
 
         if (tx.system() || tx.internal() || tx.mvccSnapshot() == null)
             return;
@@ -143,129 +140,191 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
 
         EnlistBuffer buf = enlistCache.remove(tx.xidVersion());
 
-        if (buf == null)
-            return;
+        Map<Integer, Map<KeyCacheObject, MvccTxEntry>> allCached = buf == null ? null : buf.getCached();
 
-        Map<KeyCacheObject, MvccTxEntry> cached = buf.getCached();
+        TxCounters txCntrs = tx.txCounters(false);
 
-        if (F.isEmpty(cached))
+        Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs == null ? null : txCntrs.updateCounters();
+
+        if (txCntrs == null || F.isEmpty(cntrsColl))
             return;
 
-        TxCounters txCntrs = tx.txCounters(false);
+        GridIntList cacheIds = tx.txState().cacheIds();
 
-        assert txCntrs != null || !commit;
+        assert cacheIds != null;
 
-        if (txCntrs == null)
-            return;
+        for (int i = 0; i < cacheIds.size(); i++) {
+            int cacheId = cacheIds.get(i);
 
-        Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs.updateCounters();
+            GridCacheContext ctx0 = cctx.cacheContext(cacheId);
 
-        if (F.isEmpty(cntrsColl)) {
-            assert !commit;
+            assert ctx0 != null;
 
-            return;
-        }
+            ctx0.group().listenerLock().readLock().lock();
 
-        // cacheId -> partId -> initCntr -> cntr + delta.
-        Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new HashMap<>();
+            try {
+                boolean hasListeners = ctx0.hasContinuousQueryListeners(tx);
+                boolean drEnabled = ctx0.isDrEnabled();
 
-        for (PartitionUpdateCountersMessage msg : cntrsColl) {
-            for (int i = 0; i < msg.size(); i++) {
-                Map<Integer, T2<AtomicLong, Long>> cntrPerPart =
-                    cntrsMap.computeIfAbsent(msg.cacheId(), k -> new HashMap<>());
+                if (!hasListeners && !drEnabled)
+                    continue; // There are no listeners to notify.
 
-                T2 prev = cntrPerPart.put(msg.partition(i),
-                    new T2<>(new AtomicLong(msg.initialCounter(i)), msg.initialCounter(i) + msg.updatesCount(i)));
+                // Get cached entries for the given cache.
+                Map<KeyCacheObject, MvccTxEntry> cached = allCached == null ? null : allCached.get(cacheId);
 
-                assert prev == null;
-            }
-        }
+                Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = countersPerPartition(cntrsColl);
 
-        // Feed CQ & DR with entries.
-        for (Map.Entry<KeyCacheObject, MvccTxEntry> entry : cached.entrySet()) {
-            MvccTxEntry e = entry.getValue();
+                Map<Integer, T2<AtomicLong, Long>> cntrPerCache = cntrsMap.get(cacheId);
 
-            assert e.key().partition() != -1;
+                if (F.isEmpty(cntrPerCache))
+                    continue; // No updates were made for this cache.
 
-            Map<Integer, T2<AtomicLong, Long>> cntrPerCache = cntrsMap.get(e.cacheId());
+                boolean fakeEntries = false;
 
-            GridCacheContext ctx0 = cctx.cacheContext(e.cacheId());
+                if (F.isEmpty(cached)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Transaction updates were not cached fully (this can happen when listener started" +
+                            " during the transaction execution). [tx=" + tx + ']');
 
-            assert ctx0 != null && cntrPerCache != null;
+                    if (hasListeners) {
+                        cached = createFakeCachedEntries(cntrPerCache, tx, cacheId); // Create fake update entries if we have CQ listeners.
 
-            T2<AtomicLong, Long> cntr = cntrPerCache.get(e.key().partition());
+                        fakeEntries = true;
+                    }
+                    else
+                        continue; // Nothing to do further if tx is not cached entirely and there are no any CQ listeners.
+                }
 
-            long resCntr = cntr.getKey().incrementAndGet();
+                if (F.isEmpty(cached))
+                    continue;
 
-            assert resCntr <= cntr.getValue();
+                // Feed CQ & DR with entries.
+                for (Map.Entry<KeyCacheObject, MvccTxEntry> entry : cached.entrySet()) {
+                    MvccTxEntry e = entry.getValue();
 
-            e.updateCounter(resCntr);
+                    assert e.key().partition() != -1;
 
-            if (ctx0.group().sharedGroup()) {
-                ctx0.group().onPartitionCounterUpdate(ctx0.cacheId(), e.key().partition(), resCntr,
-                    tx.topologyVersion(), tx.local());
-            }
+                    assert cntrPerCache != null;
+                    assert e.cacheId() == cacheId;
 
-            if (log.isDebugEnabled())
-                log.debug("Process cached entry:" + e);
+                    T2<AtomicLong, Long> cntr = cntrPerCache.get(e.key().partition());
 
-            // DR
-            if (ctx0.isDrEnabled()) {
-                ctx0.dr().replicate(e.key(), e.value(), e.ttl(), e.expireTime(), e.version(),
-                    tx.local() ? DR_PRIMARY : DR_BACKUP, e.topologyVersion());
-            }
+                    long resCntr = cntr.getKey().incrementAndGet();
+
+                    assert resCntr <= cntr.getValue();
 
-            // CQ
-            CacheContinuousQueryManager contQryMgr = ctx0.continuousQueries();
-
-            if (ctx0.continuousQueries().notifyContinuousQueries(tx)) {
-                contQryMgr.getListenerReadLock().lock();
-
-                try {
-                    Map<UUID, CacheContinuousQueryListener> lsnrCol = continuousQueryListeners(ctx0, tx, e.key());
-
-                    if (!F.isEmpty(lsnrCol)) {
-                        contQryMgr.onEntryUpdated(
-                            lsnrCol,
-                            e.key(),
-                            commit ? e.value() : null, // Force skip update counter if rolled back.
-                            commit ? e.oldValue() : null, // Force skip update counter if rolled back.
-                            false,
-                            e.key().partition(),
-                            tx.local(),
-                            false,
-                            e.updateCounter(),
-                            null,
-                            e.topologyVersion());
+                    e.updateCounter(resCntr);
+
+                    if (ctx0.group().sharedGroup()) {
+                        ctx0.group().onPartitionCounterUpdate(cacheId, e.key().partition(), resCntr,
+                            tx.topologyVersion(), tx.local());
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Process cached entry:" + e);
+
+                    // DR
+                    if (ctx0.isDrEnabled() && !fakeEntries) {
+                        ctx0.dr().replicate(e.key(), e.value(), e.ttl(), e.expireTime(), e.version(),
+                            tx.local() ? DR_PRIMARY : DR_BACKUP, e.topologyVersion());
+                    }
+
+                    // CQ
+                    CacheContinuousQueryManager contQryMgr = ctx0.continuousQueries();
+
+                    if (ctx0.continuousQueries().notifyContinuousQueries(tx)) {
+                        Map<UUID, CacheContinuousQueryListener> lsnrCol = continuousQueryListeners(ctx0, tx);
+
+                        if (!F.isEmpty(lsnrCol)) {
+                            contQryMgr.onEntryUpdated(
+                                lsnrCol,
+                                e.key(),
+                                commit ? e.value() : null, // Force skip update counter if rolled back.
+                                commit ? e.oldValue() : null, // Force skip update counter if rolled back.
+                                false,
+                                e.key().partition(),
+                                tx.local(),
+                                false,
+                                e.updateCounter(),
+                                null,
+                                e.topologyVersion());
+                        }
                     }
-                }
-                finally {
-                    contQryMgr.getListenerReadLock().unlock();
                 }
             }
+            finally {
+                ctx0.group().listenerLock().readLock().unlock();
+            }
         }
     }
 
     /**
-     * @param ctx0 Cache context.
-     * @param key Key.
-     * @return {@code True} if need to replicate this value.
+     * Calculates counters updates per cache and partition: cacheId -> partId -> initCntr -> cntr + delta.
+     *
+     * @param cntrsColl Counters collection.
+     * @return Counters updates per cache and partition.
      */
-    public boolean needDrReplicate(GridCacheContext ctx0, KeyCacheObject key) {
-        return ctx0.isDrEnabled() && !key.internal();
+    private Map<Integer, Map<Integer, T2<AtomicLong, Long>>> countersPerPartition(
+        Collection<PartitionUpdateCountersMessage> cntrsColl) {
+        //
+        Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new HashMap<>();
+
+        for (PartitionUpdateCountersMessage msg : cntrsColl) {
+            for (int i = 0; i < msg.size(); i++) {
+                Map<Integer, T2<AtomicLong, Long>> cntrPerPart =
+                    cntrsMap.computeIfAbsent(msg.cacheId(), k -> new HashMap<>());
+
+                T2 prev = cntrPerPart.put(msg.partition(i),
+                    new T2<>(new AtomicLong(msg.initialCounter(i)), msg.initialCounter(i) + msg.updatesCount(i)));
+
+                assert prev == null;
+            }
+        }
+
+        return cntrsMap;
+    }
+
+    /**
+     * If transaction was not cached entirely (if listener was set during tx execution), we should feed the CQ engine
+     * with a fake entries prepared by this method.
+     *
+     * @param cntrPerCache Update counters deltas made by transaction.
+     * @param tx Transaction.
+     * @param cacheId Cache id.
+     * @return Fake entries for each tx update.
+     */
+    private Map<KeyCacheObject, MvccTxEntry> createFakeCachedEntries(Map<Integer, T2<AtomicLong, Long>> cntrPerCache,
+        IgniteInternalTx tx, int cacheId) {
+        Map<KeyCacheObject, MvccTxEntry> fakeCached = new HashMap<>();
+
+        for (Map.Entry<Integer, T2<AtomicLong, Long>> e : cntrPerCache.entrySet()) {
+            int part = e.getKey();
+
+            long startCntr = e.getValue().get1().get(); // Init update counter.
+            long endCntr = e.getValue().get1().get() + e.getValue().get2(); // Init update counter + delta.
+
+            for (long i = startCntr; i < endCntr; i++) {
+                KeyCacheObject fakeKey = new KeyCacheObjectImpl("", null, part);
+
+                MvccTxEntry fakeEntry = new MvccTxEntry(fakeKey, null, 0, 0, tx.xidVersion(), null,
+                    tx.local(), tx.topologyVersion(), tx.mvccSnapshot(), cacheId);
+
+                fakeCached.put(fakeKey, fakeEntry);
+            }
+        }
+
+        return fakeCached;
     }
 
     /**
      * @param ctx0 Cache context.
      * @param tx Transaction.
-     * @param key Key.
      * @return Map of listeners to be notified by this update.
      */
-    public Map<UUID, CacheContinuousQueryListener> continuousQueryListeners(GridCacheContext ctx0, @Nullable IgniteInternalTx tx, KeyCacheObject key) {
-        boolean internal = key != null && key.internal() || !ctx0.userCache();
-
+    public Map<UUID, CacheContinuousQueryListener> continuousQueryListeners(GridCacheContext ctx0,
+        @Nullable IgniteInternalTx tx) {
         return ctx0.continuousQueries().notifyContinuousQueries(tx) ?
-            ctx0.continuousQueries().updateListeners(internal, false) : null;
+            ctx0.continuousQueries().updateListeners(!ctx0.userCache(), false) : null;
     }
 
     /**
@@ -276,11 +335,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
         /** Last DHT future id. */
         private IgniteUuid lastFutId;
 
-        /** Main buffer for entries. */
+        /** Main buffer for entries. CacheId -> entriesMap. */
         @GridToStringInclude
-        private Map<KeyCacheObject, MvccTxEntry> cached = new LinkedHashMap<>();
+        private Map<Integer, Map<KeyCacheObject, MvccTxEntry>> cached = new TreeMap<>();
 
-        /** Pending entries. */
+        /** Pending entries. BatchId -> entriesMap. */
         @GridToStringInclude
         private SortedMap<Integer, Map<KeyCacheObject, MvccTxEntry>> pending;
 
@@ -289,10 +348,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
          *
          * @param futId Future id.
          * @param batchNum Batch number.
-         * @param key Key.
          * @param e Entry.
          */
-        synchronized void add(IgniteUuid futId, int batchNum, KeyCacheObject key, MvccTxEntry e) {
+        synchronized void add(IgniteUuid futId, int batchNum, MvccTxEntry e) {
+            KeyCacheObject key = e.key();
+
             if (batchNum >= 0) {
                 /*
                  * Assume that batches within one future may be reordered. But batches between futures cannot be
@@ -307,7 +367,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
                 }
 
                 if (pending == null)
-                    pending = new TreeMap<>() ;
+                    pending = new TreeMap<>();
 
                 MvccTxEntry prev = pending.computeIfAbsent(batchNum, k -> new LinkedHashMap<>()).put(key, e);
 
@@ -317,8 +377,12 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
             else { // batchNum == -1 means no reordering (e.g. this is a primary node).
                 assert batchNum == -1;
 
-                MvccTxEntry prev = cached.put(key, e);
+                Map<KeyCacheObject, MvccTxEntry> entriesForCache = cached.computeIfAbsent(e.cacheId(), k -> new LinkedHashMap<>());
 
+                MvccTxEntry prev = entriesForCache.put(key, e);
+
+                // If key is updated more than once within transaction, we should copy old value
+                // (the value existed before tx started) from the previous entry to the new one.
                 if (prev != null && prev.oldValue() != null)
                     e.oldValue(prev.oldValue());
             }
@@ -327,7 +391,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
         /**
          * @return Cached entries map.
          */
-        synchronized Map<KeyCacheObject, MvccTxEntry> getCached() {
+        synchronized Map<Integer, Map<KeyCacheObject, MvccTxEntry>> getCached() {
             flushPending();
 
             return cached;
@@ -343,7 +407,17 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
             for (Map.Entry<Integer, Map<KeyCacheObject, MvccTxEntry>> entry : pending.entrySet()) {
                 Map<KeyCacheObject, MvccTxEntry> vals = entry.getValue();
 
-                cached.putAll(vals);
+                for (Map.Entry<KeyCacheObject, MvccTxEntry> e : vals.entrySet()) {
+                    Map<KeyCacheObject, MvccTxEntry> entriesForCache = cached
+                        .computeIfAbsent(e.getValue().cacheId(), k -> new LinkedHashMap<>());
+
+                    MvccTxEntry prev = entriesForCache.put(e.getKey(), e.getValue());
+
+                    // If key is updated more than once within transaction, we should copy old value
+                    // (the value existed before tx started) from the previous entry to the new one.
+                    if (prev != null && prev.oldValue() != null)
+                        e.getValue().oldValue(prev.oldValue());
+                }
             }
 
             pending.clear();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 503a514..7972c150 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -54,8 +54,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
@@ -78,6 +78,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap;
 
 /**
  * Continuous query handler.
@@ -185,6 +186,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** */
     private transient UUID routineId;
 
+    /** Local update counters values on listener start. Used for skipping events fired before the listener start. */
+    private transient volatile Map<Integer, T2<Long, Long>> locInitUpdCntrs;
+
     /** */
     private transient GridKernalContext ctx;
 
@@ -308,6 +312,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+        return locInitUpdCntrs;
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;
@@ -368,6 +377,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                    //noinspection unchecked
                     ctx.event().record(new CacheQueryExecutedEvent<>(
                         ctx.discovery().localNode(),
                         "Continuous query executed.",
@@ -386,6 +396,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 }
             }
 
+            @Override public void onRegister() {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx != null && !cctx.isLocal())
+                    locInitUpdCntrs = toCountersMap(cctx.topology().localUpdateCounters(false));
+            }
+
             @Override public boolean keepBinary() {
                 return keepBinary;
             }
@@ -406,6 +423,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 if (cctx == null)
                     return;
 
+                if (!needNotify(false, cctx, -1, -1,  evt))
+                    return;
+
                 // skipPrimaryCheck is set only when listen locally for replicated cache events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
@@ -513,6 +533,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 if (skipCtx == null)
                     skipCtx = new CounterSkipContext(part, cntr, topVer);
 
+                if (!needNotify(true, cctx, part, cntr, null))
+                    return skipCtx;
+
                 if (loc) {
                     assert !locOnly;
 
@@ -587,6 +610,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             @Override public boolean isPrimaryOnly() {
                 return locOnly && !skipPrimaryCheck;
             }
+
+            /**
+             * Checks whether it is need to notify listeners.
+             *
+             * @param skipEvt {@code True} if this is a skip counter event.
+             * @param cctx Cache context.
+             * @param part Partition id.
+             * @param cntr Update counter.
+             * @param evt CQ event.
+             * @return {@code True} if notification should happen immediately, or {@code false} if it should be delayed.
+             */
+            private boolean needNotify(boolean skipEvt,
+                GridCacheContext cctx,
+                int part,
+                long cntr,
+                CacheContinuousQueryEvent evt) {
+                assert !skipEvt || evt == null;
+                assert skipEvt || part == -1 && cntr == -1; // part == -1 && cntr == -1 means skip counter.
+
+                if (!cctx.mvccEnabled() || cctx.isLocal())
+                    return true;
+
+                assert locInitUpdCntrs != null;
+
+                cntr = skipEvt ? cntr : evt.getPartitionUpdateCounter();
+                part = skipEvt ? part : evt.partitionId();
+
+                T2<Long, Long> initCntr = locInitUpdCntrs.get(part);
+
+                // Do not notify listener if entry was updated before the query is started.
+                return initCntr == null || cntr >= initCntr.get2();
+            }
         };
 
         CacheContinuousQueryManager mgr = manager(ctx);
@@ -905,6 +960,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
 
         if (recordIgniteEvt && notify) {
+            //noinspection unchecked
             ctx.event().record(new CacheQueryReadEvent<>(
                 ctx.discovery().localNode(),
                 "Continuous query executed.",
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 34e3d86..a652c51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.jetbrains.annotations.Nullable;
@@ -45,6 +46,12 @@ public interface CacheContinuousQueryListener<K, V> {
         boolean recordIgniteEvt, @Nullable GridDhtAtomicAbstractUpdateFuture fut);
 
     /**
+     * Listener registration callback.
+     * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock} write lock held.
+     */
+    public void onRegister();
+
+    /**
      * Listener unregistered callback.
      */
     public void onUnregister();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 5b93a7c..b9f8fc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
@@ -69,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
@@ -131,9 +129,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /** Ordered topic prefix. */
     private String topicPrefix;
 
-    /** ReadWriteLock to control the continuous query setup - this is to prevent the race between cache update and listener setup */
-    private final StripedCompositeReadWriteLock listenerLock = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()) ;
-
     /** Cancelable future task for backup cleaner */
     private GridTimeoutProcessor.CancelableTask cancelableTask;
 
@@ -203,16 +198,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * Obtain the listener read lock, which must be held if any component need to
-     * read the list listener (generally caller to updateListener).
-     *
-     * @return Read lock for the listener update
-     */
-    public Lock getListenerReadLock() {
-        return listenerLock.readLock();
-    }
-
-    /**
      * @param tx Transaction.
      * @return {@code True} if should notify continuous query manager.
      */
@@ -956,21 +941,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 intLsnrCnt.incrementAndGet();
         }
         else {
-            listenerLock.writeLock().lock();
+            cctx.group().listenerLock().writeLock().lock();
 
             try {
-                if (lsnrCnt.get() == 0) {
-                    if (cctx.group().sharedGroup() && !cctx.isLocal())
-                        cctx.group().addCacheWithContinuousQuery(cctx);
-                }
-
                 added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
 
-                if (added)
+                if (added) {
                     lsnrCnt.incrementAndGet();
+
+                    lsnr.onRegister();
+
+                    if (lsnrCnt.get() == 1) {
+                        if (cctx.group().sharedGroup() && !cctx.isLocal())
+                            cctx.group().addCacheWithContinuousQuery(cctx);
+                    }
+                }
             }
             finally {
-                listenerLock.writeLock().unlock();
+                cctx.group().listenerLock().writeLock().unlock();
             }
 
             if (added)
@@ -996,7 +984,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             }
         }
         else {
-            listenerLock.writeLock().lock();
+            cctx.group().listenerLock().writeLock().lock();
 
             try {
                 if ((lsnr = lsnrs.remove(id)) != null) {
@@ -1007,7 +995,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 }
             }
             finally {
-                listenerLock.writeLock().unlock();
+                cctx.group().listenerLock().writeLock().unlock();
             }
 
             if (lsnr != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index dd7f108..2d31360 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1884,7 +1884,7 @@ public class IgniteTxHandler {
                         EntryProcessor entryProc = null;
                         Object[] invokeArgs = null;
 
-                        boolean needOldVal = ctx.shared().mvccCaching().continuousQueryListeners(ctx, tx, key) != null;
+                        boolean needOldVal = tx.txState().useMvccCaching(ctx.cacheId());
 
                         Message val0 = vals != null ? vals.get(i) : null;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index b1e1b02..c5f736a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -58,6 +58,9 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     /** */
     private boolean recovery;
 
+    /** */
+    private volatile boolean useMvccCaching;
+
     /** {@inheritDoc} */
     @Override public void addActiveCache(GridCacheContext ctx, boolean recovery, IgniteTxAdapter tx)
         throws IgniteCheckedException {
@@ -68,6 +71,8 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
         this.recovery = recovery;
 
         tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
+
+        useMvccCaching = cacheCtx.mvccEnabled() && (cacheCtx.isDrEnabled() || cacheCtx.hasContinuousQueryListeners(tx));
     }
 
     /** {@inheritDoc} */
@@ -307,6 +312,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean useMvccCaching(int cacheId) {
+        assert cacheCtx == null || cacheCtx.cacheId() == cacheId;
+
+        return useMvccCaching;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean recovery() {
         return recovery;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index c1d973e..e145e95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.jetbrains.annotations.Nullable;
@@ -35,6 +36,9 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     /** Active cache IDs. */
     private GridIntList activeCacheIds = new GridIntList();
 
+    /** Cache ids used for mvcc caching. See {@link MvccCachingManager}. */
+    private GridIntList mvccCachingCacheIds = new GridIntList();
+
     /** {@inheritDoc} */
     @Override public boolean implicitSingle() {
         return false;
@@ -81,8 +85,12 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
         int cacheId = cctx.cacheId();
 
         // Check if we can enlist new cache to transaction.
-        if (!activeCacheIds.contains(cacheId))
+        if (!activeCacheIds.contains(cacheId)) {
             activeCacheIds.add(cacheId);
+
+            if (cctx.mvccEnabled() && (cctx.hasContinuousQueryListeners(tx) || cctx.isDrEnabled()))
+                mvccCachingCacheIds.add(cacheId);
+        }
     }
 
     /** {@inheritDoc} */
@@ -116,4 +124,9 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
         assert false;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean useMvccCaching(int cacheId) {
+        return mvccCachingCacheIds.contains(cacheId);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 2039cc92..6feab42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -193,4 +194,11 @@ public interface IgniteTxState {
      * @return {@code True} if MVCC mode is enabled for transaction.
      */
     public boolean mvccEnabled();
+
+    /**
+     * @param cacheId Cache id.
+     * @return {@code True} if it is need to store in the heap updates made by the current TX for the given cache.
+     * These updates will be used for CQ and DR. See {@link MvccCachingManager}.
+     */
+    public boolean useMvccCaching(int cacheId);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 9cbea0f..b5a0539 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -76,6 +77,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     @GridToStringInclude
     protected Boolean mvccEnabled;
 
+    /** Cache ids used for mvcc caching. See {@link MvccCachingManager}. */
+    private GridIntList mvccCachingCacheIds = new GridIntList();
+
     /** {@inheritDoc} */
     @Override public boolean implicitSingle() {
         return false;
@@ -261,9 +265,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
                     ", cacheSystem=" + cacheCtx.systemTx() +
                     ", txSystem=" + tx.system() + ']');
             }
-            else
+            else {
                 activeCacheIds.add(cacheId);
 
+                if (cacheCtx.mvccEnabled() && (cacheCtx.hasContinuousQueryListeners(tx) || cacheCtx.isDrEnabled()))
+                    mvccCachingCacheIds.add(cacheId);
+            }
+
             if (activeCacheIds.size() == 1)
                 tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
         }
@@ -491,6 +499,11 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean useMvccCaching(int cacheId) {
+        return mvccCachingCacheIds.contains(cacheId);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteTxStateImpl.class, this, "txMap", allEntriesCopy());
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
index b12ee56..515aeef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
@@ -37,7 +37,7 @@ public class TxCounters {
     private final Map<Integer, Map<Integer, AtomicLong>> updCntrsAcc = new HashMap<>();
 
     /** Final update counters for cache partitions in the end of transaction */
-    private Collection<PartitionUpdateCountersMessage> updCntrs;
+    private volatile Collection<PartitionUpdateCountersMessage> updCntrs;
 
     /** Counter tracking number of entries locked by tx. */
     private final AtomicInteger lockCntr = new AtomicInteger();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 9801746..7cd4bf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -163,4 +163,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
      */
     public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
         Map<Integer, T2<Long, Long>> cntrs);
+
+    /**
+     * @return Init state for partition counters.
+     */
+    public Map<Integer, T2<Long, Long>> updateCounters();
 }
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 2456efa..bbd2290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1423,8 +1423,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
-                    req.addUpdateCounters(ctx.localNodeId(),
-                        toCountersMap(cache.context().topology().localUpdateCounters(false)));
+                    req.addUpdateCounters(ctx.localNodeId(), hnd0.updateCounters());
             }
         }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java
index be1705a..9c2facc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java
@@ -53,7 +53,7 @@ public class PageEvictionMetricTest extends PageEvictionAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
     @Test
     public void testPageEvictionMetricMvcc() throws Exception {
         checkPageEvictionMetric(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java
index 1b54833..13e54eb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java
@@ -91,7 +91,7 @@ public class PageEvictionPagesRecyclingAndReusingTest extends PageEvictionAbstra
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
     @Test
     public void testPagesRecyclingAndReusingMvccTxPartitioned() throws Exception {
         testPagesRecyclingAndReusing(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.PARTITIONED);
@@ -101,7 +101,7 @@ public class PageEvictionPagesRecyclingAndReusingTest extends PageEvictionAbstra
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
     @Test
     public void testPagesRecyclingAndReusingMvccTxReplicated() throws Exception {
         testPagesRecyclingAndReusing(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.REPLICATED);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java
index 8306084..12a2097 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java
@@ -79,7 +79,7 @@ public class PageEvictionTouchOrderTest extends PageEvictionAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448,https://issues.apache.org/jira/browse/IGNITE-7956")
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738,https://issues.apache.org/jira/browse/IGNITE-7956")
     @Test
     public void testTouchOrderWithFairFifoEvictionMvccTxReplicated() throws Exception {
         testTouchOrderWithFairFifoEviction(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.REPLICATED);
@@ -88,7 +88,7 @@ public class PageEvictionTouchOrderTest extends PageEvictionAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448,https://issues.apache.org/jira/browse/IGNITE-7956")
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738,https://issues.apache.org/jira/browse/IGNITE-7956")
     @Test
     public void testTouchOrderWithFairFifoEvictionMvccTxPartitioned() throws Exception {
         testTouchOrderWithFairFifoEviction(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.PARTITIONED);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java
index 38b5dc7..cedd773 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java
@@ -41,7 +41,7 @@ public abstract class PageEvictionWithRebalanceAbstractTest extends PageEviction
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
     @Test
     public void testEvictionWithRebalanceMvcc() throws Exception {
         checkEvictionWithRebalance(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
index 928c33a..2757650 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionSerializationException;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -315,7 +314,6 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10755")
     @Test
     public void testConcurrentUpdatesAndQueryStartMvccTxCacheGroup() throws Exception {
         concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, true);
@@ -429,7 +427,8 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
                 U.sleep(1000);
 
                 for (String cache : caches)
-                    qrys.add(startListener(client.cache(cache)));
+                    for (int l = 0; l < 10; l++)
+                        qrys.add(startListener(client.cache(cache)));
 
                 U.sleep(1000);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
index dd01540..bbcd62f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
@@ -17,9 +17,20 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import javax.cache.configuration.FactoryBuilder;
-
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -33,19 +44,6 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.NotNull;
-
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -228,7 +226,7 @@ public class CacheContinuousQueryExecuteInPrimaryTest extends GridCommonAbstract
                         }
                     }));
 
-                executeQuery(cache, qry, ccfg.getAtomicityMode() == TRANSACTIONAL);
+                executeQuery(cache, qry, ccfg.getAtomicityMode() != ATOMIC);
             }
 
             assertTrue(noOneListen.get());
@@ -326,7 +324,7 @@ public class CacheContinuousQueryExecuteInPrimaryTest extends GridCommonAbstract
             ));
 
             // Execute query.
-            executeQuery(cache, qry, ccfg.getAtomicityMode() == TRANSACTIONAL);
+            executeQuery(cache, qry, ccfg.getAtomicityMode() != ATOMIC);
 
             assertTrue(latch.await(LATCH_TIMEOUT, MILLISECONDS));
             assertEquals(16, cnt.get());