You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/17 13:07:58 UTC

[11/12] incubator-ignite git commit: # send previous value for atomic cache updates on unstable topology

# send previous value for atomic cache updates on unstable topology


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f0b24c47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f0b24c47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f0b24c47

Branch: refs/heads/ignite-426
Commit: f0b24c47a6b5a449a63ca5cf8fdc85811b4cc278
Parents: 4c634ed
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 17 10:22:14 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 17 13:54:16 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   5 +-
 .../processors/cache/GridCacheMapEntry.java     |  29 +-
 .../dht/GridClientPartitionTopology.java        |   7 +
 .../dht/GridDhtPartitionTopology.java           |   6 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  45 ++-
 .../dht/atomic/GridDhtAtomicCache.java          |  63 +++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   8 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  69 ++++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   5 +-
 .../distributed/near/GridNearAtomicCache.java   |   6 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   3 +-
 ...acheContinuousQueryFailoverAbstractTest.java | 271 ++++++++++++++++---
 12 files changed, 432 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 1b5a717..88ebd48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -407,6 +407,7 @@ public interface GridCacheEntryEx {
      * @param primary If update is performed on primary node (the one which assigns version).
      * @param checkVer Whether update should check current version and ignore update if current version is
      *      greater than passed in.
+     * @param topVer Topology version.
      * @param filter Optional filter to check.
      * @param drType DR type.
      * @param conflictTtl Conflict TTL (if any).
@@ -416,6 +417,7 @@ public interface GridCacheEntryEx {
      * @param intercept If {@code true} then calls cache interceptor.
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
+     * @param prevVal Previous value.
      * @return Tuple where first value is flag showing whether operation succeeded,
      *      second value is old entry value if return value is requested, third is updated entry value,
      *      fourth is the version to enqueue for deferred delete the fifth is DR conflict context
@@ -448,7 +450,8 @@ public interface GridCacheEntryEx {
         boolean conflictResolve,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable CacheObject prevVal
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 283b0b4..e3b25df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1601,7 +1601,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean conflictResolve,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable CacheObject prevVal
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 
@@ -1783,6 +1784,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                     "[entry=" + this + ", newVer=" + newVer + ']');
                         }
 
+                        if (!cctx.isNear()) {
+                            CacheObject evtVal;
+
+                            if (op == GridCacheOperation.TRANSFORM) {
+                                EntryProcessor<Object, Object, ?> entryProcessor =
+                                    (EntryProcessor<Object, Object, ?>)writeObj;
+
+                                CacheInvokeEntry<Object, Object> entry =
+                                    new CacheInvokeEntry<>(cctx, key, prevVal, version());
+
+                                try {
+                                    entryProcessor.process(entry, invokeArgs);
+
+                                    evtVal = entry.modified() ?
+                                        cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+                                }
+                                catch (Exception e) {
+                                    evtVal = prevVal;
+                                }
+                            }
+                            else
+                                evtVal = (CacheObject)writeObj;
+
+                            cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false, topVer);
+                        }
+
                         return new GridCacheUpdateAtomicResult(false,
                             retval ? rawGetOrUnmarshalUnlocked(false) : null,
                             null,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5473348..8a7576a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -824,6 +824,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        assert false;
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 2d9771f..9933444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -228,4 +228,10 @@ public interface GridDhtPartitionTopology {
      * @param threshold Threshold for number of entries.
      */
     public void printMemoryStats(int threshold);
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if rebalance process finished.
+     */
+    public boolean rebalanceFinished(AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 93f085c..1c71ff1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -85,6 +85,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Partition update counter. */
     private Map<Integer, Long> cntrMap = new HashMap<>();
 
+    /** */
+    private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
+
     /**
      * @param cctx Context.
      */
@@ -114,6 +117,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             topReadyFut = null;
 
             topVer = AffinityTopologyVersion.NONE;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -203,6 +208,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             updateSeq.setIfGreater(updSeq);
 
             topReadyFut = exchFut;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;;
         }
         finally {
             lock.writeLock().unlock();
@@ -508,6 +515,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
+            updateRebalanceVersion();
+
             consistencyCheck();
         }
         finally {
@@ -690,7 +699,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param states Additional partition states.
      * @return List of nodes for the partition.
      */
-    private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+    private List<ClusterNode> nodes(int p,
+        AffinityTopologyVersion topVer,
+        GridDhtPartitionState state,
+        GridDhtPartitionState... states) {
         Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
 
         lock.readLock().lock();
@@ -888,6 +900,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             boolean changed = checkEvictions(updateSeq);
 
+            updateRebalanceVersion();
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -1000,6 +1014,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             changed |= checkEvictions(updateSeq);
 
+            updateRebalanceVersion();
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -1196,6 +1212,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (part.own()) {
                 updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
 
+                updateRebalanceVersion();
+
                 consistencyCheck();
 
                 return true;
@@ -1268,6 +1286,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        return topVer.equals(rebalancedTopVer);
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
 
@@ -1280,6 +1303,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
+     *
+     */
+    private void updateRebalanceVersion() {
+        if (!rebalancedTopVer.equals(topVer)) {
+            for (int i = 0; i < cctx.affinity().partitions(); i++) {
+                List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+                List<ClusterNode> owners = owners(i);
+
+                if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
+                    return;
+            }
+
+            rebalancedTopVer = topVer;
+
+            if (log.isDebugEnabled())
+                log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+        }
+    }
+
+    /**
      * @param p Partition.
      * @param nodeId Node ID.
      * @param match State to match.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index eb4d51c..6c05bfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1057,7 +1057,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
             try {
-                topology().readLock();
+                GridDhtPartitionTopology top = topology();
+
+                top.readLock();
 
                 try {
                     if (topology().stopping()) {
@@ -1074,7 +1076,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     // Also do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
                     if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
-                        !needRemap(req.topologyVersion(), topology().topologyVersion())) {
+                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 
                         if (node == null) {
@@ -1089,7 +1091,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (ver == null) {
                             // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(topology().topologyVersion());
+                            ver = ctx.versions().next(top.topologyVersion());
 
                             if (hasNear)
                                 res.nearVersion(ver);
@@ -1101,6 +1103,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             log.debug("Using cache version for update request on primary node [ver=" + ver +
                                 ", req=" + req + ']');
 
+                        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
                         dhtFut = createDhtFuture(ver, req, res, completionCb, false);
 
                         expiry = expiryPolicy(req.expiry());
@@ -1123,7 +1127,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             deleted = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
@@ -1142,7 +1147,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             retVal = updRes.returnValue();
                             deleted = updRes.deleted();
@@ -1162,7 +1168,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         remap = true;
                 }
                 finally {
-                    topology().readUnlock();
+                    top.readUnlock();
                 }
             }
             catch (GridCacheEntryRemovedException e) {
@@ -1245,6 +1251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param replicate Whether replication is enabled.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Deleted entries.
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
@@ -1260,7 +1267,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) throws GridCacheEntryRemovedException {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1407,7 +1415,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 replicate,
                                 updRes,
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             firstEntryIdx = i;
 
@@ -1455,7 +1464,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 replicate,
                                 updRes,
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             firstEntryIdx = i;
 
@@ -1574,7 +1584,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 replicate,
                 updRes,
                 taskName,
-                expiry);
+                expiry,
+                sndPrevVal);
         }
         else
             assert filtered.isEmpty();
@@ -1650,6 +1661,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param replicate Whether DR is enabled for that cache.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
@@ -1664,7 +1676,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) throws GridCacheEntryRemovedException {
         GridCacheReturn retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1721,7 +1734,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    req.returnValue(),
+                    sndPrevVal || req.returnValue(),
                     expiry,
                     true,
                     true,
@@ -1736,7 +1749,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     true,
                     intercept,
                     req.subjectId(),
-                    taskName);
+                    taskName,
+                    null);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -1759,7 +1773,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 op == TRANSFORM ? req.entryProcessor(i) : null,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime(),
-                                newConflictVer);
+                                newConflictVer,
+                                sndPrevVal,
+                                updRes.oldValue());
                         }
 
                         if (!F.isEmpty(filteredReaders))
@@ -1865,6 +1881,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param batchRes Batch update result.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Deleted entries.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
@@ -1885,7 +1902,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean replicate,
         UpdateBatchResult batchRes,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) {
         assert putMap == null ^ rmvKeys == null;
 
@@ -1987,7 +2005,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/false,
+                        /*retval*/sndPrevVal,
                         expiry,
                         /*event*/true,
                         /*metrics*/true,
@@ -2002,7 +2020,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         /*conflict resolve*/false,
                         /*intercept*/false,
                         req.subjectId(),
-                        taskName);
+                        taskName,
+                        null);
 
                     assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
                         "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2038,7 +2057,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()),
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE,
-                                null);
+                                null,
+                                sndPrevVal,
+                                updRes.oldValue());
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders,
@@ -2423,7 +2444,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             processDhtAtomicUpdateRequest0(nodeId, req);
         else {
             fut.listen(new CI1<IgniteInternalFuture>() {
-                @Override public void apply(IgniteInternalFuture future) {
+                @Override public void apply(IgniteInternalFuture fut) {
                     processDhtAtomicUpdateRequest0(nodeId, req);
                 }
             });
@@ -2461,6 +2482,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         entry = entryExx(key);
 
                         CacheObject val = req.value(i);
+                        CacheObject prevVal = req.previousValue(i);
                         EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
 
                         GridCacheOperation op = entryProcessor != null ? TRANSFORM :
@@ -2493,7 +2515,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             false,
                             intercept,
                             req.subjectId(),
-                            taskName);
+                            taskName,
+                            prevVal);
 
                         if (updRes.removeVersion() != null) {
                             if (ctx.deferredDelete())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 601f1d8..d983e88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -210,7 +210,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        @Nullable CacheObject prevVal) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         int part = entry.partition();
@@ -254,7 +256,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     entryProcessor,
                     ttl,
                     conflictExpireTime,
-                    conflictVer);
+                    conflictVer,
+                    addPrevVal,
+                    prevVal);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 6340c93..2f92fde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -71,6 +71,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
+    /** Previous values. */
+    @GridToStringInclude
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> prevVals;
+
     /** Conflict versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> conflictVers;
@@ -208,13 +213,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} adds previous value.
+     * @param prevVal Previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        @Nullable CacheObject prevVal) {
         keys.add(key);
 
         if (forceTransformBackups) {
@@ -225,6 +234,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         else
             vals.add(val);
 
+        if (addPrevVal) {
+            if (prevVals == null)
+                prevVals = new ArrayList<>();
+
+            prevVals.add(prevVal);
+        }
+
         // In case there is no conflict, do not create the list.
         if (conflictVer != null) {
             if (conflictVers == null) {
@@ -419,6 +435,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public CacheObject previousValue(int idx) {
+        if (prevVals != null)
+            return prevVals.get(idx);
+
+        return null;
+    }
+
+    /**
+     * @param idx Key index.
      * @return Entry processor.
      */
     @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -670,42 +697,48 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("ttls", ttls))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("ttls", ttls))
                     return false;
 
                 writer.incrementState();
 
             case 23:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -840,7 +873,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 17:
-                subjId = reader.readUuid("subjId");
+                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -848,6 +881,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 18:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -859,7 +900,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -867,7 +908,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -875,7 +916,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 ttls = reader.readMessage("ttls");
 
                 if (!reader.isLastRead())
@@ -883,7 +924,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -891,7 +932,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -911,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 24;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 66f0300..eec7fa0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -129,7 +129,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     private final boolean rawRetval;
 
     /** Fast map flag. */
-    private final boolean fastMap;
+    private boolean fastMap;
 
     /** */
     private boolean fastMapRemap;
@@ -696,6 +696,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             return;
         }
 
+        if (fastMap && futVer == null)
+            fastMap = cctx.topology().rebalanceFinished(topVer);
+
         if (futVer == null)
             // Assign future version in topology read lock before first exception may be thrown.
             futVer = cctx.versions().next(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2255988..cf68d24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -239,7 +239,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         false,
                         false,
                         subjId,
-                        taskName);
+                        taskName,
+                        null);
 
                     if (updRes.removeVersion() != null) {
                         if (ctx.deferredDelete())
@@ -341,7 +342,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             false,
                             /*intercept*/false,
                             req.subjectId(),
-                            taskName);
+                            taskName,
+                            null);
 
                         if (updRes.removeVersion() != null) {
                             if (ctx.deferredDelete())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 0055557..0dd10ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -503,7 +503,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         boolean conflictResolve,
         boolean intercept,
         UUID subjId,
-        String taskName) throws IgniteCheckedException,
+        String taskName,
+        @Nullable CacheObject prevVal) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index e6f3bd7..151ae33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -25,9 +25,12 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
@@ -135,8 +138,91 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
+    public void testRebalanceVersion() throws Exception {
+        Ignite ignite0 = startGrid(0);
+        GridDhtPartitionTopology top0 = ((IgniteKernal)ignite0).context().cache().context().cacheContext(1).topology();
+
+        assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(1)));
+        assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(2)));
+
+        Ignite ignite1 = startGrid(1);
+        GridDhtPartitionTopology top1 = ((IgniteKernal)ignite1).context().cache().context().cacheContext(1).topology();
+
+        waitRebalanceFinished(ignite0, 2);
+        waitRebalanceFinished(ignite1, 2);
+
+        assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(3)));
+        assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(3)));
+
+        Ignite ignite2 = startGrid(2);
+        GridDhtPartitionTopology top2 = ((IgniteKernal)ignite2).context().cache().context().cacheContext(1).topology();
+
+        waitRebalanceFinished(ignite0, 3);
+        waitRebalanceFinished(ignite1, 3);
+        waitRebalanceFinished(ignite2, 3);
+
+        assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertFalse(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+        client = true;
+
+        Ignite ignite3 = startGrid(3);
+        GridDhtPartitionTopology top3 = ((IgniteKernal)ignite3).context().cache().context().cacheContext(1).topology();
+
+        assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertTrue(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertTrue(top3.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+        stopGrid(1);
+
+        waitRebalanceFinished(ignite0, 5);
+        waitRebalanceFinished(ignite2, 5);
+        waitRebalanceFinished(ignite3, 5);
+
+        stopGrid(3);
+
+        assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(6)));
+        assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(6)));
+
+        stopGrid(0);
+
+        waitRebalanceFinished(ignite2, 7);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param topVer Topology version.
+     * @throws Exception If failed.
+     */
+    private void waitRebalanceFinished(Ignite ignite, long topVer) throws Exception {
+        final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
+
+        final GridDhtPartitionTopology top =
+            ((IgniteKernal)ignite).context().cache().context().cacheContext(1).topology();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return top.rebalanceFinished(topVer0);
+            }
+        }, 5000);
+
+        assertTrue(top.rebalanceFinished(topVer0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testOneBackup() throws Exception {
-        checkBackupQueue(1);
+        checkBackupQueue(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOneBackupClientUpdate() throws Exception {
+        checkBackupQueue(1, true);
     }
 
     /**
@@ -146,14 +232,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         if (cacheMode() == REPLICATED)
             return;
 
-        checkBackupQueue(3);
+        checkBackupQueue(3, false);
     }
 
     /**
      * @param backups Number of backups.
+     * @param updateFromClient If {@code true} executes cache update from client node.
      * @throws Exception If failed.
      */
-    private void checkBackupQueue(int backups) throws Exception {
+    private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception {
         this.backups = backups;
 
         final int SRV_NODES = 4;
@@ -183,6 +270,10 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         int PARTS = 10;
 
+        Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+        List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
         for (int i = 0; i < SRV_NODES - 1; i++) {
             log.info("Stop iteration: " + i);
 
@@ -203,7 +294,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             for (Integer key : keys) {
                 log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
 
-                cache.put(key, key);
+                T2<Object, Object> t = updates.get(key);
+
+                if (t == null) {
+                    updates.put(key, new T2<>((Object)key, null));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, null));
+                }
+                else {
+                    updates.put(key, new T2<>((Object)key, (Object)key));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+                }
+
+                if (updateFromClient)
+                    qryClientCache.put(key, key);
+                else
+                    cache.put(key, key);
 
                 if (first) {
                     spi.skipMsg = true;
@@ -223,6 +330,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                 fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
             }
+
+            checkEvents(expEvts, lsnr);
         }
 
         for (int i = 0; i < SRV_NODES - 1; i++) {
@@ -241,7 +350,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             for (Integer key : keys) {
                 log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
 
-                cache.put(key, key);
+                T2<Object, Object> t = updates.get(key);
+
+                if (t == null) {
+                    updates.put(key, new T2<>((Object)key, null));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, null));
+                }
+                else {
+                    updates.put(key, new T2<>((Object)key, (Object)key));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+                }
+
+                if (updateFromClient)
+                    qryClientCache.put(key, key);
+                else
+                    cache.put(key, key);
             }
 
             if (!latch.await(5, SECONDS)) {
@@ -253,6 +378,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                 fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
             }
+
+            checkEvents(expEvts, lsnr);
         }
 
         cur.close();
@@ -261,6 +388,24 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     }
 
     /**
+     * @param expEvts Expected events.
+     * @param lsnr Listener.
+     */
+    private void checkEvents(List<T3<Object, Object, Object>> expEvts, CacheEventListener1 lsnr) {
+        for (T3<Object, Object, Object> exp : expEvts) {
+            CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
+
+            assertNotNull("No event for key: " + exp.get1(), e);
+            assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+            assertEquals("Unexpected old value: " + e, exp.get3(), e.getOldValue());
+        }
+
+        expEvts.clear();
+
+        lsnr.evts.clear();
+    }
+
+    /**
      * @param cache Cache.
      * @param parts Number of partitions.
      * @return Keys.
@@ -449,7 +594,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
-    public void _testFailover() throws Exception {
+    public void testFailover() throws Exception {
         final int SRV_NODES = 4;
 
         startGridsMultiThreaded(SRV_NODES);
@@ -483,14 +628,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     startGrid(idx);
 
-                    Thread.sleep(2000);
+                    Thread.sleep(3000);
 
                     log.info("Stop node: " + idx);
 
                     stopGrid(idx);
 
-                    Thread.sleep(1000);
-
                     CountDownLatch latch = new CountDownLatch(1);
 
                     assertTrue(checkLatch.compareAndSet(null, latch));
@@ -508,6 +651,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         final Map<Integer, Integer> vals = new HashMap<>();
 
+        final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
+
         try {
             long stopTime = System.currentTimeMillis() + 3 * 60_000;
 
@@ -518,6 +663,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             while (System.currentTimeMillis() < stopTime) {
                 Integer key = rnd.nextInt(PARTS);
 
+                Integer prevVal = vals.get(key);
                 Integer val = vals.get(key);
 
                 if (val == null)
@@ -529,6 +675,16 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                 vals.put(key, val);
 
+                List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
+
+                if (keyEvts == null) {
+                    keyEvts = new ArrayList<>();
+
+                    expEvts.put(key, keyEvts);
+                }
+
+                keyEvts.add(new T2<>(val, prevVal));
+
                 CountDownLatch latch = checkLatch.get();
 
                 if (latch != null) {
@@ -544,12 +700,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                         boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
                             @Override public boolean apply() {
-                                return checkEvents(false, vals, lsnr);
+                                return checkEvents(false, expEvts, lsnr);
                             }
                         }, 10_000);
 
                         if (!check)
-                            assertTrue(checkEvents(true, vals, lsnr));
+                            assertTrue(checkEvents(true, expEvts, lsnr));
 
                         success = true;
 
@@ -577,12 +733,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return checkEvents(false, vals, lsnr);
+                return checkEvents(false, expEvts, lsnr);
             }
         }, 10_000);
 
         if (!check)
-            assertTrue(checkEvents(true, vals, lsnr));
+            assertTrue(checkEvents(true, expEvts, lsnr));
 
         cur.close();
 
@@ -591,47 +747,64 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
     /**
      * @param logAll If {@code true} logs all unexpected values.
-     * @param vals Expected values.
+     * @param expEvts Expected values.
      * @param lsnr Listener.
      * @return Check status.
      */
-    private boolean checkEvents(boolean logAll, Map<Integer, Integer> vals, CacheEventListener2 lsnr) {
-        assertTrue(!vals.isEmpty());
-
-        ConcurrentHashMap<Integer, Integer> lsnrVals = lsnr.vals;
-
-        ConcurrentHashMap<Integer, Integer> lsnrCntrs = lsnr.cntrs;
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    private boolean checkEvents(boolean logAll,
+        Map<Integer, List<T2<Integer, Integer>>> expEvts,
+        CacheEventListener2 lsnr) {
+        assertTrue(!expEvts.isEmpty());
 
         boolean pass = true;
 
-        for (Map.Entry<Integer, Integer> e : vals.entrySet()) {
+        for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) {
             Integer key = e.getKey();
+            List<T2<Integer, Integer>> exp = e.getValue();
 
-            Integer lsnrVal = lsnrVals.get(key);
-            Integer expVal = e.getValue();
+            List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key);
 
-            if (!expVal.equals(lsnrVal)) {
+            if (rcvdEvts == null) {
                 pass = false;
 
-                log.info("Unexpected value [key=" + key + ", val=" + lsnrVal + ", expVal=" + expVal + ']');
+                log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']');
 
                 if (!logAll)
                     return false;
             }
+            else {
+                synchronized (rcvdEvts) {
+                    if (rcvdEvts.size() != exp.size()) {
+                        pass = false;
 
-            Integer lsnrCntr = lsnrCntrs.get(key);
-            Integer expCntr = expVal + 1;
+                        log.info("Missed or extra events for key [key=" + key +
+                            ", exp=" + e.getValue() +
+                            ", rcvd=" + rcvdEvts + ']');
 
-            if (!expCntr.equals(lsnrCntr)) {
-                pass = false;
+                        if (!logAll)
+                            return false;
+                    }
 
-                log.info("Unexpected events count [key=" + key + ", val=" + lsnrCntr + ", expVal=" + expCntr + ']');
+                    int cnt = Math.min(rcvdEvts.size(), exp.size());
 
-                if (!logAll)
-                    return false;
+                    for (int i = 0; i < cnt; i++) {
+                        T2<Integer, Integer> expEvt = exp.get(i);
+                        CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
+
+                        assertEquals(key, rcvdEvt.getKey());
+                        assertEquals(expEvt.get1(), rcvdEvt.getValue());
+                        assertEquals(expEvt.get2(), rcvdEvt.getOldValue());
+                    }
+                }
             }
         }
 
+        if (pass) {
+            expEvts.clear();
+            lsnr.evts.clear();
+        }
+
         return pass;
     }
 
@@ -646,6 +819,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         private GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
 
         /** */
+        private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+        /** */
         @LoggerResource
         private IgniteLogger log;
 
@@ -657,6 +833,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     log.info("Received cache event: " + evt + " " + (latch != null ? latch.getCount() : null));
 
+                    this.evts.put(evt.getKey(), evt);
+
                     keys.add((Integer) evt.getKey());
 
                     assertTrue(latch != null);
@@ -691,7 +869,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>();
 
         /** */
-        private final ConcurrentHashMap<Integer, Integer> cntrs = new ConcurrentHashMap<>();
+        private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
 
         /** {@inheritDoc} */
         @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
@@ -706,25 +884,34 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     Integer prevVal = vals.get(key);
 
+                    boolean dup = false;
+
                     if (prevVal != null) {
-                        assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
-                        assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
+                        if (prevVal.equals(val)) // Can get this event with automatic put retry.
+                            dup = true;
+                        else {
+                            assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
+                            assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
+                        }
                     }
                     else {
                         assertEquals("Unexpected event: " + evt, (Object)0, val);
                         assertNull("Unexpected event: " + evt, evt.getOldValue());
                     }
 
-                    vals.put(key, val);
+                    if (!dup) {
+                        vals.put(key, val);
 
-                    Integer cntr = cntrs.get(key);
+                        List<CacheEntryEvent<?, ?>> keyEvts = this.evts.get(key);
 
-                    if (cntr == null)
-                        cntr = 1;
-                    else
-                        cntr = cntr + 1;
+                        if (keyEvts == null) {
+                            keyEvts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
 
-                    cntrs.put(key, cntr);
+                            this.evts.put(key, keyEvts);
+                        }
+
+                        keyEvts.add(evt);
+                    }
                 }
             }
             catch (Throwable e) {