You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/20 18:04:34 UTC

[41/50] [abbrv] ignite git commit: IGNITE-9828 MVCC: Notify Continuous Query manager about missed updates during PME. This closes #5189.

IGNITE-9828 MVCC: Notify Continuous Query manager about missed updates during PME. This closes #5189.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74f312e0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74f312e0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74f312e0

Branch: refs/heads/ignite-9720
Commit: 74f312e0c863824f1cefdd589a7a0d7da140a276
Parents: eb88885
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Nov 20 15:26:31 2018 +0300
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Tue Nov 20 15:26:31 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  12 +-
 .../GridCachePartitionExchangeManager.java      |   4 +-
 .../cache/IgniteCacheOffheapManager.java        |   4 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |   4 +-
 .../cache/PartitionUpdateCounter.java           |  28 +-
 .../GridDistributedTxRemoteAdapter.java         |   2 +
 .../dht/GridDhtTransactionalCacheAdapter.java   |  53 ++-
 .../dht/GridDhtTxAbstractEnlistFuture.java      |  10 +-
 .../GridDhtPartitionsExchangeFuture.java        |  49 +++
 .../topology/GridClientPartitionTopology.java   |   8 +-
 .../dht/topology/GridDhtLocalPartition.java     |   7 +-
 .../dht/topology/GridDhtPartitionTopology.java  |   9 +-
 .../topology/GridDhtPartitionTopologyImpl.java  |  51 ++-
 .../cache/mvcc/MvccCachingManager.java          |  30 +-
 .../persistence/GridCacheOffheapManager.java    |   5 +-
 .../continuous/CacheContinuousQueryManager.java |  33 ++
 .../cache/transactions/IgniteTxAdapter.java     |   3 +
 .../cache/transactions/IgniteTxHandler.java     |   2 +-
 .../continuous/GridContinuousProcessor.java     |   6 +-
 ...tinuousQueryAsyncFailoverMvccTxSelfTest.java |  53 +++
 ...ContinuousQueryFailoverAbstractSelfTest.java |  90 ++++-
 ...usQueryFailoverMvccTxReplicatedSelfTest.java |  31 ++
 ...heContinuousQueryFailoverMvccTxSelfTest.java |  48 +++
 .../mvcc/CacheMvccBasicContinuousQueryTest.java | 363 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite4.java         |   6 +
 25 files changed, 821 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9124884..96f4dcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -6733,12 +6733,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         ensureFreeSpace();
 
+        CacheObject val = null;
+        CacheObject oldVal = null;
+
         lockEntry();
 
         try {
             checkObsolete();
 
-            CacheObject val = null;
 
             for (int i = 0; i < entries.size(); i++) {
                 GridCacheMvccEntryInfo info = (GridCacheMvccEntryInfo)entries.get(i);
@@ -6759,7 +6761,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     info.newMvccTxState());
             }
 
-            CacheObject oldVal = null;
 
             // Assume list contains  only previous committed row and rows changed by the current tx.
             if (!entries.isEmpty()) {
@@ -6806,7 +6807,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
         }
 
-        return new GridCacheUpdateTxResult(true, logPtr);
+        GridCacheUpdateTxResult res = new GridCacheUpdateTxResult(true, logPtr);
+
+        res.newValue(val);
+        res.oldValue(oldVal);
+
+        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 55c5d6c..fa63b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1475,7 +1475,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     grp.affinity().similarAffinityKey());
 
                 if (sndCounters) {
-                    CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true, true);
+                    CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true);
 
                     m.addPartitionUpdateCounters(grp.groupId(),
                         newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
@@ -1499,7 +1499,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 top.similarAffinityKey());
 
             if (sndCounters) {
-                CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true, true);
+                CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true);
 
                 m.addPartitionUpdateCounters(top.groupId(),
                     newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index d311708..2cf302f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1117,8 +1117,10 @@ public interface IgniteCacheOffheapManager {
 
         /**
          * Flushes pending update counters closing all possible gaps.
+         *
+         * @return Even-length array of pairs [start, end] for each gap.
          */
-        public void finalizeUpdateCountres();
+        GridLongList finalizeUpdateCounters();
 
         /**
          * Preload a store into page memory.

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5f467b3..08ce978 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1580,8 +1580,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public void finalizeUpdateCountres() {
-            pCntr.finalizeUpdateCountres();
+        @Override public GridLongList finalizeUpdateCounters() {
+            return pCntr.finalizeUpdateCounters();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index fe44708..39d8d5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.GridLongList;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -180,14 +181,31 @@ public class PartitionUpdateCounter {
 
     /**
      * Flushes pending update counters closing all possible gaps.
+     *
+     * @return Even-length array of pairs [start, end] for each gap.
      */
-    public synchronized void finalizeUpdateCountres() {
-        Item last = queue.pollLast();
+    public synchronized GridLongList finalizeUpdateCounters() {
+        Item item = poll();
+
+        GridLongList gaps = null;
+
+        while (item != null) {
+            if (gaps == null)
+                gaps = new GridLongList((queue.size() + 1) * 2);
+
+            long start = cntr.get() + 1;
+            long end = item.start;
 
-        if (last != null)
-            update(last.start + last.delta);
+            gaps.add(start);
+            gaps.add(end);
+
+            // Close pending ranges.
+            update(item.start + item.delta);
+
+            item = poll();
+        }
 
-        queue.clear();
+        return gaps;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c93e771..a915478 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -935,6 +935,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         catch (IgniteCheckedException | RuntimeException | Error e) {
             state(UNKNOWN);
 
+            U.error(log, "Error during tx rollback.", e);
+
             if (e instanceof IgniteCheckedException)
                 throw new IgniteException(e);
             else if (e instanceof RuntimeException)

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 60463b4..4693232 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1960,23 +1960,24 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                 req.subjectId(),
                 req.taskNameHash());
         }
-        catch (IgniteCheckedException | IgniteException ex) {
+        catch (Throwable e) {
             GridNearTxQueryResultsEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(req.cacheId(),
                 req.futureId(),
                 req.miniId(),
                 req.version(),
-                ex);
+                e);
 
             try {
                 ctx.io().send(nearNode, res, ctx.ioPolicy());
             }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send near enlist response [" +
-                    "txId=" + req.version() +
-                    ", node=" + nodeId +
-                    ", res=" + res + ']', e);
+            catch (IgniteCheckedException ioEx) {
+                U.error(log, "Failed to send near enlist response " +
+                    "[txId=" + req.version() + ", node=" + nodeId + ", res=" + res + ']', ioEx);
             }
 
+            if (e instanceof Error)
+                throw (Error) e;
+
             return;
         }
 
@@ -2231,26 +2232,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
     /**
      * @param primary Primary node.
-     * @param req Request.
-     * @param e Error.
-     */
-    private void onError(UUID primary, GridDhtTxQueryEnlistRequest req, Throwable e) {
-        GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(ctx.cacheId(),
-            req.dhtFutureId(),
-            req.batchId(),
-            e);
-
-        try {
-            ctx.io().send(primary, res, ctx.ioPolicy());
-        }
-        catch (IgniteCheckedException ioEx) {
-            U.error(log, "Failed to send DHT enlist reply to primary node [node: " + primary + ", req=" + req +
-                ']', ioEx);
-        }
-    }
-
-    /**
-     * @param primary Primary node.
      * @param req Message.
      * @param first Flag if this is a first request in current operation.
      */
@@ -2321,8 +2302,22 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                     req + ']', ioEx);
             }
         }
-        catch (IgniteCheckedException e) {
-            onError(primary, req, e);
+        catch (Throwable e) {
+            GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(ctx.cacheId(),
+                req.dhtFutureId(),
+                req.batchId(),
+                e);
+
+            try {
+                ctx.io().send(primary, res, ctx.ioPolicy());
+            }
+            catch (IgniteCheckedException ioEx) {
+                U.error(log, "Failed to send DHT enlist reply to primary node " +
+                    "[node: " + primary + ", req=" + req + ']', ioEx);
+            }
+
+            if (e instanceof Error)
+                throw (Error) e;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index cfa8eb7..9949930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -629,7 +629,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
                 updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot, cctx.cacheId(), tx, null, -1);
 
         if (op != EnlistOperation.LOCK)
-            addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId());
+            addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), op);
     }
 
     /**
@@ -642,7 +642,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
      * @param cacheId Cache Id.
      */
     private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearchRow> hist,
-        int cacheId) throws IgniteCheckedException {
+        int cacheId, EnlistOperation op) throws IgniteCheckedException {
         List<ClusterNode> backups = backupNodes(key);
 
         int part = cctx.affinity().partition(key);
@@ -680,13 +680,15 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
             if (batch == null)
                 batches.put(node.id(), batch = new Batch(node));
 
-            if (moving && hist0 == null) {
+            if (moving && hist0 == null && !op.isInvoke()) {
                 assert !F.isEmpty(hist);
 
                 hist0 = fetchHistoryInfo(key, hist);
             }
 
-            batch.add(key, moving ? hist0 : val);
+            Message m = moving && !op.isInvoke() ? hist0 : val;
+
+            batch.add(key, m);
 
             if (batch.size() == BATCH_SIZE) {
                 assert batches != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a79c95f..44fc266 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1466,6 +1466,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         changeWalModeIfNeeded();
 
+        if (events().hasServerLeft())
+            finalizePartitionCounters();
+
         cctx.exchange().exchangerBlockingSectionBegin();
 
         try {
@@ -3465,6 +3468,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Collects non local cache groups.
+     *
+     * @return Collection of non local cache groups.
+     */
+    private List<CacheGroupContext> nonLocalCacheGroups() {
+        return cctx.cache().cacheGroups().stream()
+            .filter(grp -> !grp.isLocal() && !cacheGroupStopping(grp.groupId()))
+            .collect(Collectors.toList());
+    }
+
+    /**
      * Validates that partition update counters and cache sizes for all caches are consistent.
      */
     private void validatePartitionsState() {
@@ -3548,6 +3562,41 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Removes gaps in the local update counters. Gaps in update counters are possible on backup node when primary
+     * failed to send update counter deltas to backup.
+     */
+    private void finalizePartitionCounters() {
+        long time = System.currentTimeMillis();
+
+        try {
+            int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
+
+            // Reserve at least 2 threads for system operations.
+            parallelismLvl = Math.max(1, parallelismLvl - 2);
+
+            if (parallelismLvl > 1) {
+                U.doInParallel(parallelismLvl,
+                    cctx.kernalContext().getSystemExecutorService(),
+                    nonLocalCacheGroups(),
+                    grp -> {
+                        grp.topology().finalizeUpdateCounters();
+
+                        return null;
+                    }
+                );
+            }
+            else
+                nonLocalCacheGroups().forEach(grp -> grp.topology().finalizeUpdateCounters());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to finalize partition counters", e);
+        }
+
+        if (log.isInfoEnabled())
+            log.info("Partition counters finalization performed in " + (System.currentTimeMillis() - time) + " ms.");
+    }
+
+    /**
      * @param finishState State.
      * @param msg Request.
      * @param nodeId Node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 01db508..b1d1048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1232,12 +1232,16 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
-        boolean finalizeCntrsBeforeCollecting) {
+    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
         return CachePartitionPartialCountersMap.EMPTY;
     }
 
     /** {@inheritDoc} */
+    @Override public void finalizeUpdateCounters() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public Map<Integer, Long> partitionSizes() {
         return Collections.emptyMap();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 8ee7a9d..f3516a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntry
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1362,9 +1363,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
     /**
      * Flushes pending update counters closing all possible gaps.
+     *
+     * @return Even-length array of pairs [start, end] for each gap.
      */
-    public void finalizeUpdateCountres() {
-        store.finalizeUpdateCountres();
+    public GridLongList finalizeUpdateCounters() {
+        return store.finalizeUpdateCounters();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 25b284e..be8a789 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -345,17 +345,20 @@ public interface GridDhtPartitionTopology {
     public Collection<Integer> lostPartitions();
 
     /**
+     * Pre-processes partition update counters before exchange.
+     */
+    void finalizeUpdateCounters();
+
+    /**
      * @return Partition update counters.
      */
     public CachePartitionFullCountersMap fullUpdateCounters();
 
     /**
      * @param skipZeros {@code True} for adding zero counter to map.
-     * @param finalizeCntrsBeforeCollecting {@code True} indicates that partition counters should be finalized.
      * @return Partition update counters.
      */
-    public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
-        boolean finalizeCntrsBeforeCollecting);
+    public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
 
     /**
      * @return Partition cache sizes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 6418dc7..a127876 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -2670,6 +2672,49 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         }
     }
 
+    /**
+     * Pre-processes partition update counters before exchange.
+     */
+    @Override public void finalizeUpdateCounters() {
+        if (!grp.mvccEnabled())
+            return;
+
+        // It is need to acquire checkpoint lock before topology lock acquiring.
+        ctx.database().checkpointReadLock();
+
+        try {
+            lock.readLock().lock();
+
+            try {
+                for (int i = 0; i < locParts.length(); i++) {
+                    GridDhtLocalPartition part = locParts.get(i);
+
+                    if (part != null && part.state().active()) {
+                        // We need to close all gaps in partition update counters sequence. We assume this finalizing is
+                        // happened on exchange and hence all txs are completed. Therefore each gap in update counters
+                        // sequence is a result of undelivered DhtTxFinishMessage on backup (sequences on primary nodes
+                        // do not have gaps). Here we close these gaps and asynchronously notify continuous query engine
+                        // about the skipped events.
+                        AffinityTopologyVersion topVer = ctx.exchange().readyAffinityVersion();
+
+                        GridLongList gaps = part.finalizeUpdateCounters();
+
+                        if (gaps != null) {
+                            for (GridCacheContext ctx0 : grp.caches())
+                                ctx0.continuousQueries().closeBackupUpdateCountersGaps(ctx0, part.id(), topVer, gaps);
+                        }
+                    }
+                }
+            }
+            finally {
+                lock.readLock().unlock();
+            }
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public CachePartitionFullCountersMap fullUpdateCounters() {
         lock.readLock().lock();
@@ -2683,8 +2728,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
-        boolean finalizeCntrsBeforeCollecting) {
+    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
         try {
@@ -2705,9 +2749,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                if (finalizeCntrsBeforeCollecting)
-                    part.finalizeUpdateCountres();
-
                 long updCntr = part.updateCounter();
                 long initCntr = part.initialUpdateCounter();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
index 259f69b..8f83b6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
@@ -40,8 +40,10 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -146,16 +148,23 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
 
         Map<KeyCacheObject, MvccTxEntry> cached = buf.getCached();
 
-        if (F.isEmpty(cached) || !commit)
+        if (F.isEmpty(cached))
             return;
 
         TxCounters txCntrs = tx.txCounters(false);
 
-        assert txCntrs != null;
+        assert txCntrs != null || !commit;
 
-        Collection<PartitionUpdateCountersMessage> cntrsColl =  txCntrs.updateCounters();
+        if (txCntrs == null)
+            return;
+
+        Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs.updateCounters();
 
-        assert  !F.isEmpty(cntrsColl) : cntrsColl;
+        if (F.isEmpty(cntrsColl)) {
+            assert !commit;
+
+            return;
+        }
 
         // cacheId -> partId -> initCntr -> cntr + delta.
         Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new HashMap<>();
@@ -219,8 +228,8 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
                         contQryMgr.onEntryUpdated(
                             lsnrCol,
                             e.key(),
-                            e.value(),
-                            e.oldValue(),
+                            commit ? e.value() : null, // Force skip update counter if rolled back.
+                            commit ? e.oldValue() : null, // Force skip update counter if rolled back.
                             false,
                             e.key().partition(),
                             tx.local(),
@@ -253,7 +262,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
      * @return Map of listeners to be notified by this update.
      */
     public Map<UUID, CacheContinuousQueryListener> continuousQueryListeners(GridCacheContext ctx0, @Nullable IgniteInternalTx tx, KeyCacheObject key) {
-        boolean internal = key.internal() || !ctx0.userCache();
+        boolean internal = key != null && key.internal() || !ctx0.userCache();
 
         return ctx0.continuousQueries().notifyContinuousQueries(tx) ?
             ctx0.continuousQueries().updateListeners(internal, false) : null;
@@ -268,9 +277,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
         private IgniteUuid lastFutId;
 
         /** Main buffer for entries. */
+        @GridToStringInclude
         private Map<KeyCacheObject, MvccTxEntry> cached = new LinkedHashMap<>();
 
         /** Pending entries. */
+        @GridToStringInclude
         private SortedMap<Integer, Map<KeyCacheObject, MvccTxEntry>> pending;
 
         /**
@@ -337,5 +348,10 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
 
             pending.clear();
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EnlistBuffer.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 48e86e4..f24900f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1722,12 +1722,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public void finalizeUpdateCountres() {
+        @Override public GridLongList finalizeUpdateCounters() {
             try {
                 CacheDataStore delegate0 = init0(true);
 
-                if (delegate0 != null)
-                    delegate0.finalizeUpdateCountres();
+                return delegate0 != null ? delegate0.finalizeUpdateCounters() : null;
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6bd3fc2..4c399bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
@@ -67,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
@@ -275,6 +277,37 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * For cache updates in shared cache group need notify others caches CQ listeners
+     * that generated counter should be skipped.
+     *
+     * @param cctx Cache context.
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @param gaps Even-length array of pairs [start, end] for each gap.
+     */
+    @Nullable public void closeBackupUpdateCountersGaps(GridCacheContext cctx,
+        int part,
+        AffinityTopologyVersion topVer,
+        GridLongList gaps) {
+        assert gaps != null && gaps.size() % 2 == 0;
+
+        for (int i = 0; i < gaps.size() / 2; i++) {
+            long gapStart = gaps.get(i * 2);
+            long gapStop = gaps.get(i * 2 + 1);
+
+            /*
+             * No user listeners should be called by this invocation. In the common case of partitioned cache or
+             * replicated cache with non-local-only listener gaps (dummy filtered CQ events) will be added to the
+             * backup queue without passing it to any listener. In the special case of local-only listener on
+             * replicated cache there is no backup queues used at all and therefore no gaps occur - all unfiltered
+             * events are passed to listeners upon arrive.
+             */
+            for (long cntr = gapStart; cntr <= gapStop; cntr++)
+                skipUpdateEvent(lsnrs, null, part, cntr, false, topVer);
+        }
+    }
+
+    /**
      * @param internal Internal entry flag (internal key or not user cache).
      * @param preload Whether update happened during preloading.
      * @return Registered listeners.

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 8b24e01..52659e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -273,6 +273,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     protected ConsistentIdMapper consistentIdMapper;
 
     /** Mvcc tx update snapshot. */
+    @GridToStringInclude
     protected volatile MvccSnapshot mvccSnapshot;
 
     /** Rollback finish future. */
@@ -280,6 +281,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     private volatile IgniteInternalFuture rollbackFut;
 
     /** */
+    @SuppressWarnings("unused")
+    @GridToStringExclude
     private volatile TxCounters txCounters;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d715362..3685f7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1895,7 +1895,7 @@ public class IgniteTxHandler {
                     invokeArgs = invokeVal.invokeArgs();
                 }
 
-                assert entryProc != null || !op.isInvoke();
+                assert entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
 
                 GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fbfd99b..4569f65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1423,7 +1423,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
                     req.addUpdateCounters(ctx.localNodeId(),
-                        toCountersMap(cache.context().topology().localUpdateCounters(false, false)));
+                        toCountersMap(cache.context().topology().localUpdateCounters(false)));
             }
         }
 
@@ -1564,7 +1564,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                                 if (cache != null && !cache.isLocal() && cache.context().userCache()) {
                                     CachePartitionPartialCountersMap cntrsMap =
-                                        cache.context().topology().localUpdateCounters(false, false);
+                                        cache.context().topology().localUpdateCounters(false);
 
                                     cntrs = U.marshal(marsh, cntrsMap);
                                 }
@@ -2504,7 +2504,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                         if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
                             cntrsPerNode.put(ctx.localNodeId(),
-                                toCountersMap(cctx.topology().localUpdateCounters(false, false)));
+                                toCountersMap(cctx.topology().localUpdateCounters(false)));
 
                         routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java
new file mode 100644
index 0000000..334d219
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverMvccTxSelfTest  extends CacheContinuousQueryFailoverAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testBackupQueueEvict() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testLeftPrimaryAndBackupNodes() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-10047");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 0fef7b2..e7c2261 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -74,8 +74,8 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
@@ -104,6 +105,7 @@ import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
@@ -111,6 +113,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  *
@@ -521,7 +525,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME);
 
             CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology()
-                .localUpdateCounters(false, false);
+                .localUpdateCounters(false);
 
             for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
                 if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) {
@@ -760,6 +764,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             }
         }, 5000L);
 
+        awaitPartitionMapExchange();
+
         for (; keyIter < keys.size(); keyIter++) {
             int key = keys.get(keyIter);
 
@@ -784,7 +790,18 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                     expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
             }
 
-            clnCache.put(key, val);
+            boolean updated = false;
+
+            while (!updated) {
+                try {
+                    clnCache.put(key, val);
+
+                    updated = true;
+                }
+                catch (Exception ignore) {
+                    assertEquals(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, atomicityMode());
+                }
+            }
 
             filtered = !filtered;
         }
@@ -977,8 +994,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 T2<Object, Object> t = updates.get(key);
 
                 if (updateFromClient) {
-                    if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
-                        try (Transaction tx = qryClient.transactions().txStart()) {
+                    if (atomicityMode() != CacheAtomicityMode.ATOMIC) {
+                        try (Transaction tx = qryClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                             qryClientCache.put(key, key);
 
                             tx.commit();
@@ -993,8 +1010,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                         qryClientCache.put(key, key);
                 }
                 else {
-                    if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
-                        try (Transaction tx = ignite.transactions().txStart()) {
+                    if (atomicityMode() != CacheAtomicityMode.ATOMIC) {
+                        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                             cache.put(key, key);
 
                             tx.commit();
@@ -1755,18 +1772,30 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 if (filtered)
                     val = -val;
 
-                if (processorPut && prevVal != null) {
-                    qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
-                        @Override public Void process(MutableEntry<Object, Object> entry,
-                            Object... arguments) throws EntryProcessorException {
-                            entry.setValue(arguments[0]);
+                boolean updated = false;
+
+                while (!updated) {
+                    try {
+                        if (processorPut && prevVal != null) {
+                            qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+                                @Override public Void process(MutableEntry<Object, Object> entry,
+                                    Object... arguments) throws EntryProcessorException {
+                                    entry.setValue(arguments[0]);
 
-                            return null;
+                                    return null;
+                                }
+                            }, val);
                         }
-                    }, val);
+                        else
+                            qryClnCache.put(key, val);
+
+                        updated = true;
+                    }
+                    catch (CacheException e) {
+                        assertTrue(X.hasCause(e, TransactionRollbackException.class));
+                        assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
+                    }
                 }
-                else
-                    qryClnCache.put(key, val);
 
                 processorPut = !processorPut;
 
@@ -2020,7 +2049,20 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
                         Integer val = valCntr.incrementAndGet();
 
-                        Integer prevVal = (Integer)qryClnCache.getAndPut(key, val);
+                        Integer prevVal = null;
+
+                        boolean updated = false;
+
+                        while (!updated) {
+                            try {
+                                prevVal = (Integer)qryClnCache.getAndPut(key, val);
+
+                                updated = true;
+                            }
+                            catch (CacheException e) {
+                                assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
+                            }
+                        }
 
                         expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal));
 
@@ -2114,7 +2156,19 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 @Override public Object call() throws Exception {
                     Integer val0 = val.getAndIncrement();
 
-                    cache.put(key, val0);
+                    boolean updated = false;
+
+                    while (!updated) {
+                        try {
+                            cache.put(key, val0);
+
+                            updated = true;
+                        }
+                        catch (CacheException e) {
+                            assertTrue(X.hasCause(e, TransactionRollbackException.class));
+                            assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
+                        }
+                    }
 
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java
new file mode 100644
index 0000000..2576d23
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverMvccTxReplicatedSelfTest extends CacheContinuousQueryFailoverMvccTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java
new file mode 100644
index 0000000..76d1689
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverMvccTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testBackupQueueEvict() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testLeftPrimaryAndBackupNodes() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-10047");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
index ed97b1b..e08341a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
@@ -17,11 +17,14 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import javax.cache.event.CacheEntryEvent;
@@ -29,23 +32,35 @@ import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.TX_SIZE_THRESHOLD;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 
 /**
  * Basic continuous queries test with enabled mvcc.
@@ -227,4 +242,352 @@ public class CacheMvccBasicContinuousQueryTest extends CacheMvccAbstractTest  {
             }
         },  CacheException.class, "Failed to run update. Transaction is too large. Consider reducing transaction size");
     }
+
+    /**
+     * @throws Exception  If failed.
+     */
+    public void testUpdateCountersGapClosedSimplePartitioned() throws Exception {
+        checkUpdateCountersGapIsProcessedSimple(CacheMode.PARTITIONED);
+    }
+
+    /**
+     * @throws Exception  If failed.
+     */
+    public void testUpdateCountersGapClosedSimpleReplicated() throws Exception {
+        checkUpdateCountersGapIsProcessedSimple(CacheMode.REPLICATED);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    private void checkUpdateCountersGapIsProcessedSimple(CacheMode cacheMode) throws Exception {
+        testSpi = true;
+
+        int srvCnt = 4;
+
+        startGridsMultiThreaded(srvCnt);
+
+        client = true;
+
+        IgniteEx nearNode = startGrid(srvCnt);
+
+        IgniteCache<Object, Object> cache = nearNode.createCache(
+            cacheConfiguration(cacheMode, FULL_SYNC, srvCnt - 1, srvCnt)
+                .setIndexedTypes(Integer.class, Integer.class));
+
+        IgniteEx primary = grid(0);
+
+        List<Integer> keys = primaryKeys(primary.cache(DEFAULT_CACHE_NAME), 3);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        List<CacheEntryEvent> arrivedEvts = new ArrayList<>();
+
+        CountDownLatch latch = new CountDownLatch(2);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent e : evts) {
+                    arrivedEvts.add(e);
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        QueryCursor<Cache.Entry<Integer, Integer>> cur = nearNode.cache(DEFAULT_CACHE_NAME).query(qry);
+
+        // Initial value.
+        cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(0))).getAll();
+
+        Transaction txA = nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+        // prevent first transaction prepare on backups
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(primary);
+
+        spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                private final AtomicInteger limiter = new AtomicInteger();
+
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtTxPrepareRequest)
+                        return limiter.getAndIncrement() < srvCnt - 1;
+
+                    if (msg instanceof GridContinuousMessage)
+                        return true;
+
+                    return false;
+                }
+            });
+
+        cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(1))).getAll();
+
+        txA.commitAsync();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return nearNode.context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == PREPARING);
+            }
+        }, 3_000);
+
+        GridTestUtils.runAsync(() -> {
+            try (Transaction txB = nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(2)));
+
+                txB.commit();
+            }
+        }).get();
+
+        long primaryUpdCntr = getUpdateCounter(primary, keys.get(0));
+
+        assertEquals(3, primaryUpdCntr); // There were three updates.
+
+        // drop primary
+        stopGrid(primary.name());
+
+        // Wait all txs are rolled back.
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                boolean allRolledBack = true;
+
+                for (int i = 1; i < srvCnt; i++) {
+                    boolean rolledBack = grid(i).context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == ROLLED_BACK);
+
+                    allRolledBack &= rolledBack;
+                }
+
+                return allRolledBack;
+            }
+        }, 3_000);
+
+        for (int i = 1; i < srvCnt; i++) {
+            IgniteCache backupCache = grid(i).cache(DEFAULT_CACHE_NAME);
+
+            int size = backupCache.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+            long backupCntr = getUpdateCounter(grid(i), keys.get(0));
+
+            assertEquals(2, size);
+            assertEquals(primaryUpdCntr, backupCntr);
+        }
+
+        assertTrue(latch.await(3, SECONDS));
+
+        assertEquals(2, arrivedEvts.size());
+        assertEquals(keys.get(0), arrivedEvts.get(0).getKey());
+        assertEquals(keys.get(2), arrivedEvts.get(1).getKey());
+
+        cur.close();
+        nearNode.close();
+    }
+
+    /**
+     * @throws Exception  If failed.
+     */
+    public void testUpdateCountersGapClosedPartitioned() throws Exception {
+        checkUpdateCountersGapsClosed(CacheMode.PARTITIONED);
+    }
+
+    /**
+     * @throws Exception  If failed.
+     */
+    public void testUpdateCountersGapClosedReplicated() throws Exception {
+        checkUpdateCountersGapsClosed(CacheMode.REPLICATED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkUpdateCountersGapsClosed(CacheMode cacheMode) throws Exception {
+        testSpi = true;
+
+        int srvCnt = 4;
+
+        startGridsMultiThreaded(srvCnt);
+
+        IgniteEx nearNode = grid(srvCnt - 1);
+
+        IgniteCache<Object, Object> cache = nearNode.createCache(
+            cacheConfiguration(cacheMode, FULL_SYNC, srvCnt - 1, srvCnt)
+                .setIndexedTypes(Integer.class, Integer.class));
+
+        IgniteEx primary = grid(0);
+
+        Affinity<Object> aff = nearNode.affinity(cache.getName());
+
+        int[] nearBackupParts = aff.backupPartitions(nearNode.localNode());
+
+        int[] primaryParts = aff.primaryPartitions(primary.localNode());
+
+        Collection<Integer> nearSet = new HashSet<>();
+
+        for (int part : nearBackupParts)
+            nearSet.add(part);
+
+        Collection<Integer> primarySet = new HashSet<>();
+
+        for (int part : primaryParts)
+            primarySet.add(part);
+
+        // We need backup partitions on the near node.
+        nearSet.retainAll(primarySet);
+
+        List<Integer> keys = singlePartKeys(primary.cache(DEFAULT_CACHE_NAME), 20, nearSet.iterator().next());
+
+        int range = 3;
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        List<CacheEntryEvent> arrivedEvts = new ArrayList<>();
+
+        CountDownLatch latch = new CountDownLatch(range * 2);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent e : evts) {
+                    arrivedEvts.add(e);
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        QueryCursor<Cache.Entry<Integer, Integer>> cur = nearNode.cache(DEFAULT_CACHE_NAME).query(qry);
+
+        // prevent first transaction prepare on backups
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(primary);
+
+        spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            private final AtomicInteger limiter = new AtomicInteger();
+
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (msg instanceof GridDhtTxPrepareRequest)
+                    return limiter.getAndIncrement() < srvCnt - 1;
+
+                return false;
+            }
+        });
+
+        Transaction txA = primary.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+        for (int i = 0; i < range; i++)
+            primary.cache(DEFAULT_CACHE_NAME).put(keys.get(i), 2);
+
+        txA.commitAsync();
+
+        GridTestUtils.runAsync(() -> {
+            try (Transaction tx = primary.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                for (int i = range; i < range * 2; i++)
+                    primary.cache(DEFAULT_CACHE_NAME).put(keys.get(i), 1);
+
+                tx.commit();
+            }
+        }).get();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return primary.context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == PREPARING);
+            }
+        }, 3_000);
+
+        GridTestUtils.runAsync(() -> {
+            try (Transaction txB = primary.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                for (int i = range * 2; i < range * 3; i++)
+                    primary.cache(DEFAULT_CACHE_NAME).put(keys.get(i), 3);
+
+                txB.commit();
+            }
+        }).get();
+
+        long primaryUpdCntr = getUpdateCounter(primary, keys.get(0));
+
+        assertEquals(range * 3, primaryUpdCntr);
+
+        // drop primary
+        stopGrid(primary.name());
+
+        // Wait all txs are rolled back.
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                boolean allRolledBack = true;
+
+                for (int i = 1; i < srvCnt; i++) {
+                    boolean rolledBack = grid(i).context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == ROLLED_BACK);
+
+                    allRolledBack &= rolledBack;
+                }
+
+                return allRolledBack;
+            }
+        }, 3_000);
+
+        for (int i = 1; i < srvCnt; i++) {
+            IgniteCache backupCache = grid(i).cache(DEFAULT_CACHE_NAME);
+
+            int size = backupCache.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+            long backupCntr = getUpdateCounter(grid(i), keys.get(0));
+
+            assertEquals(range * 2, size);
+            assertEquals(primaryUpdCntr, backupCntr);
+        }
+
+        assertTrue(latch.await(5, SECONDS));
+
+        assertEquals(range * 2, arrivedEvts.size());
+
+        cur.close();
+        nearNode.close();
+    }
+
+    /**
+     * @param primaryCache Cache.
+     * @param size Number of keys.
+     * @return Keys belong to a given part.
+     * @throws Exception If failed.
+     */
+    private List<Integer> singlePartKeys(IgniteCache<Object, Object> primaryCache, int size, int part) throws Exception {
+        Ignite ignite = primaryCache.unwrap(Ignite.class);
+
+        List<Integer> res = new ArrayList<>();
+
+        final Affinity<Object> aff = ignite.affinity(primaryCache.getName());
+
+        final ClusterNode node = ignite.cluster().localNode();
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return aff.primaryPartitions(node).length > 0;
+            }
+        }, 5000));
+
+        int cnt = 0;
+
+        for (int key = 0; key < aff.partitions() * size * 10; key++) {
+            if (aff.partition(key) == part) {
+                res.add(key);
+
+                if (++cnt == size)
+                    break;
+            }
+        }
+
+        assertEquals(size, res.size());
+
+        return res;
+    }
+
+    /**
+     * @param node Node.
+     * @param key Key.
+     * @return Extracts update counter of partition which key belongs to.
+     */
+    private long getUpdateCounter(IgniteEx node, Integer key) {
+        int partId = node.cachex(DEFAULT_CACHE_NAME).context().affinity().partition(key);
+
+        GridDhtLocalPartition part = node.cachex(DEFAULT_CACHE_NAME).context().dht().topology().localPartition(partId);
+
+        assert part != null;
+
+        return part.updateCounter();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
index 2aa3419..147eb47 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
@@ -19,10 +19,13 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverMvccTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverMvccTxReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverMvccTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
 
@@ -42,10 +45,13 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverMvccTxSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.class);
 
         suite.addTestSuite(CacheContinuousQueryAsyncFailoverAtomicSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryAsyncFailoverMvccTxSelfTest.class);
 
         return suite;
     }