You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/01 05:55:21 UTC

[11/21] ignite git commit: IGNITE-7764: MVCC: cache API support. This closes #4725.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b8c78bd..9493510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -72,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyRollbackOnlyImpl;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
@@ -191,11 +192,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     /** */
     private MvccQueryTracker mvccTracker;
 
-    /** Whether this transaction is for SQL operations or not.<p>
+    /** Whether this is Mvcc transaction or not.<p>
      * {@code null} means there haven't been any calls made on this transaction, and first operation will give this
      * field actual value.
      */
-    private Boolean sql;
+    private Boolean mvccOp;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -205,7 +206,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
-     * @param ctx   Cache registry.
+     * @param ctx Cache registry.
      * @param implicit Implicit flag.
      * @param implicitSingle Implicit with one key flag.
      * @param sys System flag.
@@ -214,7 +215,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @param isolation Isolation.
      * @param timeout Timeout.
      * @param storeEnabled Store enabled flag.
-     * @param sql Whether this transaction was started via SQL API or not, or {@code null} if unknown.
+     * @param mvccOp Whether this transaction was started via SQL API or not, or {@code null} if unknown.
      * @param txSize Transaction size.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
@@ -230,7 +231,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         TransactionIsolation isolation,
         long timeout,
         boolean storeEnabled,
-        Boolean sql,
+        Boolean mvccOp,
         int txSize,
         @Nullable UUID subjId,
         int taskNameHash,
@@ -257,7 +258,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
         mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl();
 
-        this.sql = sql;
+        this.mvccOp = mvccOp;
 
         initResult();
 
@@ -574,6 +575,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     ) {
         assert key != null;
 
+        if (cacheCtx.mvccEnabled())
+            return mvccPutAllAsync0(cacheCtx, Collections.singletonMap(key, val),
+                entryProcessor == null ? null : Collections.singletonMap(key, entryProcessor), invokeArgs, retval, filter);
+
         try {
             beforePut(cacheCtx, retval, false);
 
@@ -628,7 +633,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                 if (log.isDebugEnabled())
                     log.debug("Before acquiring transaction lock for put on key: " + enlisted);
 
-                IgniteInternalFuture<Boolean>fut = cacheCtx.cache().txLockAsync(enlisted,
+                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
                     timeout,
                     this,
                     /*read*/entryProcessor != null, // Needed to force load from store.
@@ -696,6 +701,142 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
+     * Validate Tx mode.
+     *
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If tx mode is not supported.
+     */
+    protected void validateTxMode(GridCacheContext ctx) throws IgniteCheckedException {
+        if(!ctx.mvccEnabled() || pessimistic() && repeatableRead())
+            return;
+
+        throw new IgniteCheckedException("Only pessimistic repeatable read transactions are supported at the moment.");
+    }
+
+    /**
+     * Internal method for put and transform operations in Mvcc mode.
+     * Note: Only one of {@code map}, {@code transformMap} maps must be non-null.
+     *
+     * @param cacheCtx Context.
+     * @param map Key-value map to store.
+     * @param invokeMap Invoke map.
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param retval Key-transform value map to store.
+     * @param filter Filter.
+     * @return Operation future.
+     */
+    private <K, V> IgniteInternalFuture mvccPutAllAsync0(
+        final GridCacheContext cacheCtx,
+        @Nullable Map<? extends K, ? extends V> map,
+        @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
+        @Nullable final Object[] invokeArgs,
+        final boolean retval,
+        @Nullable final CacheEntryPredicate filter
+    ) {
+        try {
+            validateTxMode(cacheCtx);
+
+            // TODO: IGNITE-9540: Fix invoke/invokeAll.
+            if(invokeMap != null)
+                MvccUtils.verifyMvccOperationSupport(cacheCtx, "invoke/invokeAll");
+
+            if (mvccSnapshot == null) {
+                MvccUtils.mvccTracker(cacheCtx, this);
+
+                assert mvccSnapshot != null;
+            }
+
+            beforePut(cacheCtx, retval, true);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
+        // Cached entry may be passed only from entry wrapper.
+        final Map<?, ?> map0 = map;
+        final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+
+        if (log.isDebugEnabled())
+            log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+
+        assert map0 != null || invokeMap0 != null;
+
+        if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+            if (implicit())
+                try {
+                    commit();
+                }
+                catch (IgniteCheckedException e) {
+                    return new GridFinishedFuture<>(e);
+                }
+
+            return new GridFinishedFuture<>(new GridCacheReturn(true, false));
+        }
+
+        try {
+            // Set transform flag for transaction.
+            if (invokeMap != null)
+                transform = true;
+
+            Set<?> keys = map0 != null ? map0.keySet() : invokeMap0.keySet();
+
+            final Map<KeyCacheObject, CacheObject> enlisted = new HashMap<>(keys.size());
+
+            for (Object key : keys) {
+                if (isRollbackOnly())
+                    return new GridFinishedFuture<>(timedOut() ? timeoutException() : rollbackException());
+
+                if (key == null) {
+                    rollback();
+
+                    throw new NullPointerException("Null key.");
+                }
+
+                Object val = map0 == null ? null : map0.get(key);
+                EntryProcessor entryProcessor = transform ? invokeMap.get(key) : null;
+
+                if (val == null && entryProcessor == null) {
+                    setRollbackOnly();
+
+                    throw new NullPointerException("Null value.");
+                }
+
+                KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+                CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+                enlisted.put(cacheKey, cacheVal);
+            }
+
+            return updateAsync(cacheCtx, new UpdateSourceIterator<IgniteBiTuple<KeyCacheObject, CacheObject>>() {
+
+                private Iterator<Map.Entry<KeyCacheObject, CacheObject>> it = enlisted.entrySet().iterator();
+
+                @Override public EnlistOperation operation() {
+                    return EnlistOperation.UPSERT;
+                }
+
+                @Override public boolean hasNextX() throws IgniteCheckedException {
+                    return it.hasNext();
+                }
+
+                @Override public IgniteBiTuple<KeyCacheObject, CacheObject> nextX() throws IgniteCheckedException {
+                    Map.Entry<KeyCacheObject, CacheObject> next = it.next();
+
+                    return new IgniteBiTuple<>(next.getKey(), next.getValue());
+                }
+            }, retval, filter, remainingTime(), true);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+        catch (RuntimeException e) {
+            onException();
+
+            throw e;
+        }
+    }
+
+    /**
      * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
      * maps must be non-null.
      *
@@ -717,6 +858,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
         final boolean retval
     ) {
+        if (cacheCtx.mvccEnabled())
+            return mvccPutAllAsync0(cacheCtx, map, invokeMap, invokeArgs, retval, null);
+
         try {
             beforePut(cacheCtx, retval, false);
         }
@@ -1549,6 +1693,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         final boolean retval,
         @Nullable final CacheEntryPredicate filter,
         boolean singleRmv) {
+        if(cacheCtx.mvccEnabled())
+            return mvccRemoveAllAsync0(cacheCtx, keys, retval, filter);
+
         try {
             checkUpdatesAllowed(cacheCtx);
         }
@@ -1558,9 +1705,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
         cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
-        if (cacheCtx.mvccEnabled() && !isOperationAllowed(false))
-            return txTypeMismatchFinishFuture();
-
         if (retval)
             needReturnValue(true);
 
@@ -1690,9 +1834,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                 -1L);
 
             PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                /** {@inheritDoc} */
                 @Override protected GridCacheReturn postLock(GridCacheReturn ret)
-                    throws IgniteCheckedException
-                {
+                    throws IgniteCheckedException {
                     if (log.isDebugEnabled())
                         log.debug("Acquired transaction lock for remove on keys: " + enlisted);
 
@@ -1769,6 +1913,93 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
+     * Internal method for remove operations in Mvcc mode.
+     *
+     * @param cacheCtx Cache context.
+     * @param keys Keys to remove.
+     * @param retval Flag indicating whether a value should be returned.
+     * @param filter Filter.
+     * @return Future for asynchronous remove.
+     */
+    @SuppressWarnings("unchecked")
+    private <K, V> IgniteInternalFuture<GridCacheReturn> mvccRemoveAllAsync0(
+        final GridCacheContext cacheCtx,
+        @Nullable final Collection<? extends K> keys,
+        final boolean retval,
+        @Nullable final CacheEntryPredicate filter
+    ) {
+        try {
+            validateTxMode(cacheCtx);
+
+            if (mvccSnapshot == null) {
+                MvccUtils.mvccTracker(cacheCtx, this);
+
+                assert mvccSnapshot != null;
+            }
+
+            beforeRemove(cacheCtx, retval, true);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
+        if (F.isEmpty(keys)) {
+            if (implicit()) {
+                try {
+                    commit();
+                }
+                catch (IgniteCheckedException e) {
+                    return new GridFinishedFuture<>(e);
+                }
+            }
+
+            return new GridFinishedFuture<>(new GridCacheReturn(localResult(), true));
+        }
+
+        init();
+
+        Set<KeyCacheObject> enlisted = new HashSet<>(keys.size());
+
+        try {
+            for (Object key : keys) {
+                if (isRollbackOnly())
+                    return new GridFinishedFuture<>(timedOut() ? timeoutException() : rollbackException());
+
+                if (key == null) {
+                    rollback();
+
+                    throw new NullPointerException("Null key.");
+                }
+
+                KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+                enlisted.add(cacheKey);
+            }
+
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
+        return updateAsync(cacheCtx, new UpdateSourceIterator<KeyCacheObject>() {
+
+            private Iterator<KeyCacheObject> it = enlisted.iterator();
+
+            @Override public EnlistOperation operation() {
+                return EnlistOperation.DELETE;
+            }
+
+            @Override public boolean hasNextX() throws IgniteCheckedException {
+                return it.hasNext();
+            }
+
+            @Override public KeyCacheObject nextX() throws IgniteCheckedException {
+                return it.next();
+            }
+        }, retval, filter, remainingTime(), true);
+    }
+
+    /**
      * @param cctx Cache context.
      * @return Mvcc snapshot for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs).
      */
@@ -1846,10 +2077,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
+     * Executes key-value update operation in Mvcc mode.
+     *
+     * @param cacheCtx Cache context.
+     * @param it Entries iterator.
+     * @param retval Return value flag.
+     * @param filter Filter.
+     * @param timeout Timeout.
+     * @param sequential Sequential locking flag.
+     * @return Operation future.
+     */
+    private IgniteInternalFuture<GridCacheReturn> updateAsync(GridCacheContext cacheCtx,
+        UpdateSourceIterator<?> it,
+        boolean retval,
+        @Nullable CacheEntryPredicate filter,
+        long timeout,
+        boolean sequential) {
+        try {
+            final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+            final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+            /* TODO: IGNITE-9688: 'sequential' is always true here which can slowdown bulk operations,
+             but possibly we can safely optimize this. */
+
+            GridNearTxEnlistFuture fut = new GridNearTxEnlistFuture(cacheCtx, this,
+                timeout, it, 0, sequential, filter, retval);
+
+            fut.init();
+
+            return nonInterruptable(new GridEmbeddedFuture<>(fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Boolean>() {
+                @Override public Boolean applyx(IgniteInternalFuture<GridCacheReturn> fut0) throws IgniteCheckedException {
+                    fut0.get();
+
+                    return true;
+                }
+            }), new PLC1<GridCacheReturn>(null) {
+                @Override protected GridCacheReturn postLock(GridCacheReturn ret) throws IgniteCheckedException {
+                    GridCacheReturn futRes = fut.get();
+
+                    assert futRes != null;
+
+                    mvccSnapshot.incrementOperationCounter();
+
+                    return new GridCacheReturn(cacheCtx, true, keepBinary, futRes.value(), futRes.success());
+                }
+            }));
+        }
+        catch (RuntimeException e) {
+            onException();
+
+            throw e;
+        }
+    }
+
+    /**
+     * Executes update query operation in Mvcc mode.
+     *
      * @param fut Enlist future.
      * @return Operation future.
      */
-    public IgniteInternalFuture<Long> updateAsync(GridNearTxAbstractEnlistFuture fut) {
+    public IgniteInternalFuture<Long> updateAsync(GridNearTxQueryAbstractEnlistFuture fut) {
         fut.init();
 
         return nonInterruptable(new GridEmbeddedFuture<>(fut.chain(new CX1<IgniteInternalFuture<Long>, Boolean>() {
@@ -1900,36 +2188,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
 
-        if (cacheCtx.mvccEnabled() && !isOperationAllowed(false))
+        try {
+            validateTxMode(cacheCtx);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
+        if (cacheCtx.mvccEnabled() && !isOperationAllowed(true))
             return txTypeMismatchFinishFuture();
 
         init();
 
-        if (cacheCtx.mvccEnabled() && (optimistic() && !readCommitted()) && mvccTracker == null) {
-            // TODO IGNITE-7388: support async tx rollback (e.g. on timeout).
-            boolean canRemap = cctx.lockedTopologyVersion(null) == null;
-
-            mvccTracker = new MvccQueryTrackerImpl(cacheCtx, canRemap);
-
-            return new GridEmbeddedFuture<>(mvccTracker.requestSnapshot(topologyVersion()),
-                new IgniteBiClosure<MvccSnapshot, Exception, IgniteInternalFuture<Map<K, V>>>() {
-                @Override public IgniteInternalFuture<Map<K, V>> apply(MvccSnapshot snapshot, Exception e) {
-                    if (e != null)
-                        return new GridFinishedFuture<>(e);
-
-                    return getAllAsync(cacheCtx,
-                        entryTopVer,
-                        keys,
-                        deserializeBinary,
-                        skipVals,
-                        keepCacheObjects,
-                        skipStore,
-                        recovery,
-                        needVer);
-                }
-            });
-        }
-
         int keysCnt = keys.size();
 
         boolean single = keysCnt == 1;
@@ -2234,8 +2504,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @param keepCacheObjects Keep cache objects flag.
      * @param skipStore Skip store flag.
      * @param recovery Recovery flag..
-     * @throws IgniteCheckedException If failed.
      * @return Enlisted keys.
+     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings({"RedundantTypeArguments"})
     private <K, V> Collection<KeyCacheObject> enlistRead(
@@ -2568,8 +2838,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
             new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
                 @Override public void apply(KeyCacheObject key,
-                                            @Nullable Object val,
-                                            @Nullable GridCacheVersion loadVer) {
+                    @Nullable Object val,
+                    @Nullable GridCacheVersion loadVer) {
                     if (log.isDebugEnabled())
                         log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
 
@@ -2729,7 +2999,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
-     * @param cacheCtx  Cache context.
+     * @param cacheCtx Cache context.
      * @param readThrough Read through flag.
      * @param async if {@code True}, then loading will happen in a separate thread.
      * @param keys Keys.
@@ -2868,7 +3138,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
-     * @param cacheCtx  Cache context.
+     * @param cacheCtx Cache context.
      * @param readThrough Read through flag.
      * @param async if {@code True}, then loading will happen in a separate thread.
      * @param keys Keys.
@@ -3498,7 +3768,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         if (log.isDebugEnabled())
             log.debug("Committing near local tx: " + this);
 
-        final NearTxFinishFuture fut, fut0 = finishFut; boolean fastFinish;
+        final NearTxFinishFuture fut;
+        final NearTxFinishFuture fut0 = finishFut;
+
+        boolean fastFinish;
 
         if (fut0 != null || !FINISH_FUT_UPD.compareAndSet(this, null, fut = finishFuture(fastFinish = fastFinish(), true)))
             return chainFinishFuture(finishFut, true, true, false);
@@ -3577,9 +3850,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         IgniteInternalFuture<?> prepFut = this.prepFut;
 
         if (onTimeout && prepFut instanceof GridNearTxPrepareFutureAdapter && !prepFut.isDone())
-            ((GridNearTxPrepareFutureAdapter) prepFut).onNearTxLocalTimeout();
+            ((GridNearTxPrepareFutureAdapter)prepFut).onNearTxLocalTimeout();
+
+        final NearTxFinishFuture fut;
+        final NearTxFinishFuture fut0 = finishFut;
 
-        final NearTxFinishFuture fut, fut0 = finishFut; boolean fastFinish;
+        boolean fastFinish;
 
         if (fut0 != null)
             return chainFinishFuture(finishFut, false, clearThreadMap, onTimeout);
@@ -3627,9 +3903,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
-     * @return Transaction commit future.
+     * Finish transaction.
+     *
      * @param fast {@code True} in case of fast finish.
      * @param commit {@code True} if commit.
+     * @return Transaction commit future.
      */
     private NearTxFinishFuture finishFuture(boolean fast, boolean commit) {
         NearTxFinishFuture fut = fast ? new GridNearTxFastFinishFuture(this, commit) :
@@ -3724,7 +4002,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @return {@code True} if 'fast finish' path can be used for transaction completion.
      */
     private boolean fastFinish() {
-        return writeMap().isEmpty()
+        return writeMap().isEmpty() && !queryEnlisted()
             && ((optimistic() && !serializable()) || readMap().isEmpty())
             && (mappings.single() || F.view(mappings.mappings(), CU.FILTER_QUERY_MAPPING).isEmpty());
     }
@@ -4174,14 +4452,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @return {@code true} if this transaction does not have type flag set or it matches invoking operation,
      * {@code false} otherwise.
      */
-    public boolean isOperationAllowed(boolean sqlOp) {
-        if (sql == null) {
-            sql = sqlOp;
+    public boolean isOperationAllowed(boolean mvccOp) {
+        if (this.mvccOp == null) {
+            this.mvccOp = mvccOp;
 
             return true;
         }
 
-        return sql == sqlOp;
+        return this.mvccOp == mvccOp;
     }
 
     /**
@@ -4385,17 +4663,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     /**
      * @param cacheCtx Cache context.
      * @param retval Return value flag.
-     * @param sql SQL operation flag.
+     * @param mvccOp SQL operation flag.
      * @throws IgniteCheckedException If failed.
      */
-    private void beforePut(GridCacheContext cacheCtx, boolean retval, boolean sql) throws IgniteCheckedException {
-        assert !sql || cacheCtx.mvccEnabled();
+    private void beforePut(GridCacheContext cacheCtx, boolean retval, boolean mvccOp) throws IgniteCheckedException {
+        assert !mvccOp || cacheCtx.mvccEnabled();
 
         checkUpdatesAllowed(cacheCtx);
 
         cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT);
 
-        if (cacheCtx.mvccEnabled() && !isOperationAllowed(sql))
+        if (cacheCtx.mvccEnabled() && !isOperationAllowed(mvccOp))
             throw new IgniteCheckedException(TX_TYPE_MISMATCH_ERR_MSG);
 
         if (retval)
@@ -4408,6 +4686,28 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
     /**
      * @param cacheCtx Cache context.
+     * @param retval Return value flag.
+     * @param mvccOp SQL operation flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void beforeRemove(GridCacheContext cacheCtx, boolean retval, boolean mvccOp) throws IgniteCheckedException {
+        assert !mvccOp || cacheCtx.mvccEnabled();
+
+        checkUpdatesAllowed(cacheCtx);
+
+        cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+
+        if (cacheCtx.mvccEnabled() && !isOperationAllowed(mvccOp))
+            throw new IgniteCheckedException(TX_TYPE_MISMATCH_ERR_MSG);
+
+        if (retval)
+            needReturnValue(true);
+
+        checkValid();
+    }
+
+    /**
+     * @param cacheCtx Cache context.
      * @throws IgniteCheckedException If updates are not allowed.
      */
     private void checkUpdatesAllowed(GridCacheContext cacheCtx) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
new file mode 100644
index 0000000..714c62d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ *
+ */
+public abstract class GridNearTxQueryAbstractEnlistFuture extends GridNearTxAbstractEnlistFuture<Long> {
+    /**
+     * @param cctx Cache context.
+     * @param tx Transaction.
+     * @param timeout Timeout.
+     */
+    public GridNearTxQueryAbstractEnlistFuture(
+        GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout) {
+        super(cctx, tx, timeout, CU.longReducer());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index 9a2dfa3..6d48b97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -43,9 +43,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTx
  * Cache lock future.
  */
 @SuppressWarnings("ForLoopReplaceableByForEach")
-public class GridNearTxQueryEnlistFuture extends GridNearTxAbstractEnlistFuture {
-    /** */
-    private static final long serialVersionUID = -2155104765461405820L;
+public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFuture {
     /** Involved cache ids. */
     private final int[] cacheIds;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
index dae1e81..d628de1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
@@ -38,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * A response to {@link GridNearTxQueryEnlistRequest}.
  */
 public class GridNearTxQueryEnlistResponse extends GridCacheIdMessage implements ExceptionAware {
     /** */
@@ -99,6 +99,7 @@ public class GridNearTxQueryEnlistResponse extends GridCacheIdMessage implements
      * @param lockVer Lock version.
      * @param res Result.
      * @param removeMapping Remove mapping flag.
+     * @param newDhtNodes New DHT nodes involved into transaction.
      */
     public GridNearTxQueryEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer, long res,
         boolean removeMapping, Set<UUID> newDhtNodes) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index 2452b92..b83339b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -62,10 +62,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
  * A future tracking requests for remote nodes transaction enlisting and locking
  * of entries produced with complex DML queries requiring reduce step.
  */
-public class GridNearTxQueryResultsEnlistFuture extends GridNearTxAbstractEnlistFuture {
-    /** */
-    private static final long serialVersionUID = 4339957209840477447L;
-
+public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractEnlistFuture {
     /** */
     public static final int DFLT_BATCH_SIZE = 1024;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
index 94cacfa..48c63bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
@@ -54,7 +54,7 @@ public class GridNearTxQueryResultsEnlistResponse extends GridNearTxQueryEnlistR
      * @param res Result.
      * @param dhtFutId Dht future id.
      * @param dhtVer Dht version.
-     * @param newDhtNodes New
+     * @param newDhtNodes New DHT nodes involved into transaction.
      */
     public GridNearTxQueryResultsEnlistResponse(int cacheId,
         IgniteUuid futId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index ff1c85f..ca77bf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -1233,7 +1233,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                                             for (TxKey key : waitMap.keySet()) {
                                                 assert key.major() == snapshot.coordinatorVersion()
                                                     && key.minor() > snapshot.cleanupVersion()
-                                                    || key.major() > snapshot.coordinatorVersion();
+                                                    || key.major() > snapshot.coordinatorVersion() :
+                                                    "key=" + key + ", snapshot=" + snapshot;
                                             }
                                         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
index f46d1e0..9a767ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.NotNull;
 
@@ -46,7 +47,6 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
     private final IgniteLogger log;
 
     /** */
-    @GridToStringExclude
     private long crdVer;
 
     /** */
@@ -259,6 +259,9 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
         IgniteInternalFuture<AffinityTopologyVersion> waitFut =
             cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion());
 
+        if(log.isDebugEnabled())
+            log.debug("Remap on new topology: " + waitFut);
+
         if (waitFut == null)
             requestSnapshot(cctx.shared().exchange().readyAffinityVersion(), lsnr);
         else {
@@ -325,6 +328,11 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
         return true;
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccQueryTrackerImpl.class, this);
+    }
+
     /** */
     private final class ListenerDecorator implements MvccSnapshotResponseListener {
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index c57a790..16c30c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSna
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -1795,13 +1796,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             GridCacheVersion ver,
             long expireTime,
             MvccSnapshot mvccVer,
+            CacheEntryPredicate filter,
             boolean primary,
             boolean needHistory,
-            boolean noCreate) throws IgniteCheckedException {
+            boolean noCreate,
+            boolean retVal) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
-            return delegate.mvccUpdate(
-                cctx, key, val, ver, expireTime, mvccVer, primary, needHistory, noCreate);
+            return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, primary,
+                needHistory, noCreate, retVal);
         }
 
         /** {@inheritDoc} */
@@ -1809,11 +1812,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             GridCacheContext cctx,
             KeyCacheObject key,
             MvccSnapshot mvccVer,
+            CacheEntryPredicate filter,
             boolean primary,
-            boolean needHistory) throws IgniteCheckedException {
+            boolean needHistory,
+            boolean retVal) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
-            return delegate.mvccRemove(cctx, key, mvccVer, primary, needHistory);
+            return delegate.mvccRemove(cctx, key, mvccVer, filter, primary, needHistory, retVal);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fb6293c..d0e3dca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1720,10 +1720,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      */
     public void markQueryEnlisted(MvccSnapshot ver) {
         if (!qryEnlisted) {
+            assert ver != null || mvccSnapshot != null;
+
             if (mvccSnapshot == null)
                 mvccSnapshot = ver;
 
-            cctx.coordinators().registerLocalTransaction(ver.coordinatorVersion(), ver.counter());
+            if(dht())
+                cctx.coordinators().registerLocalTransaction(mvccSnapshot.coordinatorVersion(), mvccSnapshot.counter());
 
             qryEnlisted = true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 7cc3e55..438c8ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -466,7 +466,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @param timeout transaction timeout.
-     * @param sql Whether this transaction is being started via SQL API or not, or {@code null} if unknown.
+     * @param mvccOp Whether this transaction is being started via SQL API or not, or {@code null} if unknown.
      * @param txSize Expected transaction size.
      * @param lb Label.
      * @return New transaction.
@@ -479,7 +479,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         TransactionIsolation isolation,
         long timeout,
         boolean storeEnabled,
-        Boolean sql,
+        Boolean mvccOp,
         int txSize,
         @Nullable String lb
     ) {
@@ -499,7 +499,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             isolation,
             timeout,
             storeEnabled,
-            sql,
+            mvccOp,
             txSize,
             subjId,
             taskNameHash,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
index 716094e..2a0b582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
@@ -21,9 +21,12 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
@@ -38,6 +41,8 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwar
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
@@ -49,6 +54,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isActiv
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isVisible;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccVersionIsValid;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException;
+import static org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType.FILTERED;
 
 /**
  *
@@ -94,6 +100,9 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
     /** Whether tx has overridden it's own update. */
     private static final int OWN_VALUE_OVERRIDDEN = DELETED << 1;
 
+    /** Force read full entry instead of header only.  */
+    private static final int NEED_PREV_VALUE = OWN_VALUE_OVERRIDDEN << 1;
+
     /** */
     @GridToStringExclude
     private final GridCacheContext cctx;
@@ -125,6 +134,10 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
     /** */
     private List<MvccLinkAwareSearchRow> historyRows;
 
+    /** */
+    @GridToStringExclude
+    private CacheEntryPredicate filter;
+
     /**
      * @param cctx Cache context.
      * @param key Key.
@@ -148,10 +161,12 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
         long expireTime,
         MvccSnapshot mvccSnapshot,
         MvccVersion newVer,
+        @Nullable CacheEntryPredicate filter,
         boolean primary,
         boolean lockOnly,
         boolean needHistory,
-        boolean fastUpdate) {
+        boolean fastUpdate,
+        boolean needPrevValue) {
         super(key,
             val,
             ver,
@@ -163,6 +178,7 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
 
         this.mvccSnapshot = mvccSnapshot;
         this.cctx = cctx;
+        this.filter = filter;
         this.keyAbsentBefore = primary; // True for primary and false for backup (backups do not use this flag).
 
         assert !lockOnly || val == null;
@@ -181,6 +197,9 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
         if (fastUpdate)
             flags |= FAST_UPDATE;
 
+        if(needPrevValue)
+            flags |= NEED_PREV_VALUE;
+
         setFlags(flags);
     }
 
@@ -237,8 +256,18 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
 
                 if (removed)
                     setFlags(DELETED);
-                else
-                    oldRow = row;
+                else {
+                    // Actually, full row can be omitted for replace(k,newval) and putIfAbsent, but
+                    // operation context is not available here and full row required if filter is set.
+                    if (res == ResultType.PREV_NOT_NULL && (isFlagsSet(NEED_PREV_VALUE) || filter != null))
+                        oldRow = tree.getRow(io, pageAddr, idx, RowData.FULL);
+                    else
+                        oldRow = row;
+                }
+
+                // TODO: IGNITE-9689: optimize filter usage here. See {@link org.apache.ignite.internal.processors.cache.CacheOperationFilter}.
+                if(filter != null && !applyFilter(res == ResultType.PREV_NOT_NULL ? oldRow.value() : null))
+                    res = FILTERED;
 
                 setFlags(LAST_COMMITTED_FOUND | OWN_VALUE_OVERRIDDEN);
 
@@ -293,9 +322,14 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
                     else {
                         res = ResultType.PREV_NOT_NULL;
 
-                        oldRow = row;
-
                         keyAbsentBefore = false;
+
+                        // Actually, full row can be omitted for replace(k,newval) and putIfAbsent, but
+                        // operation context is not available here and full row required if filter is set.
+                        if( (isFlagsSet(NEED_PREV_VALUE) || filter != null))
+                            oldRow = tree.getRow(io, pageAddr, idx, RowData.FULL);
+                        else
+                            oldRow = row;
                     }
 
                     if (isFlagsSet(CHECK_VERSION)) {
@@ -337,9 +371,13 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
                         }
                     }
 
+                    // TODO: IGNITE-9689: optimize filter usage here. See {@link org.apache.ignite.internal.processors.cache.CacheOperationFilter}.
+                    if(filter != null && !applyFilter(res == ResultType.PREV_NOT_NULL ? oldRow.value() : null))
+                        res = FILTERED;
+
                     // Lock entry for primary partition if needed.
                     // If invisible row is found for FAST_UPDATE case we should not lock row.
-                    if (isFlagsSet(PRIMARY | REMOVE_OR_LOCK) && !isFlagsSet(FAST_MISMATCH)) {
+                    if (!isFlagsSet(DELETED) && isFlagsSet(PRIMARY | REMOVE_OR_LOCK) && !isFlagsSet(FAST_MISMATCH)) {
                         rowIo.setMvccLockCoordinatorVersion(pageAddr, idx, mvccCrd);
                         rowIo.setMvccLockCounter(pageAddr, idx, mvccCntr);
 
@@ -423,6 +461,22 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
         return unsetFlags(FIRST);
     }
 
+    /**
+     * Apply filter.
+     *
+     * @param val0 Previous value.
+     * @return Filter result.
+     */
+    private boolean applyFilter(final CacheObject val0) {
+        GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key) {
+            @Nullable @Override public CacheObject peekVisibleValue() {
+                return val0;
+            }
+        };
+
+        return filter.apply(e);
+    }
+
     /** {@inheritDoc} */
     @Override public int state() {
         return state;
@@ -436,10 +490,26 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
     }
 
     /**
-     * @return {@code True} if previous value was non-null.
+     * @return Result type.
      */
-    @Override public ResultType resultType() {
-        return res == null ? ResultType.PREV_NULL : res;
+    @NotNull @Override public ResultType resultType() {
+        return res == null ? defaultResult() : res;
+    }
+
+    /**
+     * Evaluate default result type.
+     *
+     * @return Result type.
+     */
+    @NotNull private ResultType defaultResult() {
+        assert res == null;
+
+        if (filter != null && !applyFilter(null))
+            res = FILTERED;
+        else
+            res = ResultType.PREV_NULL; // Default.
+
+        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
index eecb4a5..16e7e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
@@ -30,5 +30,7 @@ public enum ResultType {
     /** */
     LOCKED,
     /** */
-    VERSION_MISMATCH
+    VERSION_MISMATCH,
+    /** */
+    FILTERED
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 9bbf03d..1a3c8d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -483,8 +483,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     /** {@inheritDoc} */
     @Override public GridCacheUpdateTxResult mvccSet(@Nullable IgniteInternalTx tx, UUID affNodeId, CacheObject val,
         long ttl0, AffinityTopologyVersion topVer, MvccSnapshot mvccVer,
-        GridCacheOperation op, boolean needHistory,
-        boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        GridCacheOperation op, boolean needHistory, boolean noCreate, CacheEntryPredicate filter, boolean retVal)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
         rawPut(val, ttl);
 
         return new GridCacheUpdateTxResult(true);
@@ -492,7 +492,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
 
     /** {@inheritDoc} */
     @Override public GridCacheUpdateTxResult mvccRemove(@Nullable IgniteInternalTx tx, UUID affNodeId,
-        AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory)
+        AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory,
+        CacheEntryPredicate filter, boolean retVal)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         obsoleteVer = ver;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
index dfc8b05..6a00ea4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
@@ -27,6 +27,9 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
@@ -154,6 +157,13 @@ public class IgniteCacheTxIteratorSelfTest extends GridCommonAbstractTest {
                 for (TransactionIsolation iso : TransactionIsolation.values()) {
                     for (TransactionConcurrency con : TransactionConcurrency.values()) {
                         try (Transaction transaction = ignite.transactions().txStart(con, iso)) {
+                            //TODO: IGNITE-7187: Fix when ticket will be implemented. (Near cache)
+                            //TODO: IGNITE-7956: Fix when ticket will be implemented. (Eviction)
+                            if (((IgniteCacheProxy)cache).context().mvccEnabled() &&
+                                ((iso != TransactionIsolation.REPEATABLE_READ && con != TransactionConcurrency.PESSIMISTIC)
+                                    || nearEnabled || useEvicPlc))
+                                return; // Nothing to do. Mode is not supported.
+
                             assertEquals(val, cache.get(key));
 
                             transaction.commit();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
index b2cbe05..c1718b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,10 +52,8 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  * Base class for Mvcc coordinator failover test.
@@ -73,6 +71,8 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
         ReadMode readMode,
         WriteMode writeMode
     ) throws Exception {
+        assert concurrency == PESSIMISTIC && isolation == REPEATABLE_READ;
+
         testSpi = true;
 
         startGrids(3);
@@ -169,7 +169,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
 
         final int KEYS = 100;
 
-        final Map<Integer, Integer> vals = new HashMap<>();
+        final Map<Integer, Integer> vals = new LinkedHashMap<>();
 
         for (int i = 0; i < KEYS; i++)
             vals.put(i, 0);
@@ -298,7 +298,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
                     Integer val = 1;
 
                     while (!done.get()) {
-                        Map<Integer, Integer> vals = new HashMap<>();
+                        Map<Integer, Integer> vals = new LinkedHashMap<>();
 
                         for (int i = 0; i < KEYS; i++)
                             vals.put(i, val);
@@ -479,9 +479,6 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
             ", crdChangeCnt=" + crdChangeCnt +
             ", readInTx=" + readInTx + ']');
 
-        TransactionConcurrency concurrency = readMode == ReadMode.GET ? OPTIMISTIC : PESSIMISTIC; // TODO IGNITE-7184
-        TransactionIsolation isolation = readMode == ReadMode.GET ? SERIALIZABLE : REPEATABLE_READ; // TODO IGNITE-7184
-
         testSpi = true;
 
         client = false;
@@ -510,7 +507,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
 
         final IgniteCache cache = getNode.createCache(ccfg);
 
-        final Set<Integer> keys = new HashSet<>();
+        final Set<Integer> keys = new TreeSet<>();
 
         List<Integer> keys1 = primaryKeys(jcache(COORDS), 10);
         List<Integer> keys2 = primaryKeys(jcache(COORDS + 1), 10);
@@ -518,7 +515,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
         keys.addAll(keys1);
         keys.addAll(keys2);
 
-        Map<Integer, Integer> vals = new HashMap();
+        Map<Integer, Integer> vals = new LinkedHashMap();
 
         for (Integer key : keys)
             vals.put(key, -1);
@@ -544,7 +541,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
                 Map<Integer, Integer> res = null;
 
                 if (readInTx) {
-                    try (Transaction tx = getNode.transactions().txStart(concurrency, isolation)) {
+                    try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                         res = readAllByMode(cache, keys, readMode, INTEGER_CODEC);
 
                         tx.rollback();
@@ -581,7 +578,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
             stopGrid(i);
 
         for (int i = 0; i < 10; i++) {
-            vals = new HashMap();
+            vals = new LinkedHashMap();
 
             for (Integer key : keys)
                 vals.put(key, i);
@@ -636,7 +633,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
 
         final IgniteCache cache = client.createCache(ccfg);
 
-        final Map<Integer, Integer> vals = new HashMap();
+        final Map<Integer, Integer> vals = new LinkedHashMap<>();
 
         for (int i = 0; i < 100; i++)
             vals.put(i, i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
index 54e4315..60f1a2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
@@ -113,27 +113,6 @@ public abstract class CacheMvccAbstractCoordinatorFailoverTest extends CacheMvcc
     /**
      * @throws Exception If failed.
      */
-    public void testCoordinatorFailureSimpleSerializableTxPutGet() throws Exception {
-        coordinatorFailureSimple(OPTIMISTIC, SERIALIZABLE, GET, PUT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCoordinatorFailureSimpleOptimisticTxPutGet() throws Exception {
-        coordinatorFailureSimple(OPTIMISTIC, REPEATABLE_READ, GET, PUT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxInProgressCoordinatorChangeSimple_ReadonlyPutGet() throws Exception {
-        txInProgressCoordinatorChangeSimple(OPTIMISTIC, SERIALIZABLE, null, GET, PUT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testReadInProgressCoordinatorFailsSimple_FromClientPutGet() throws Exception {
         readInProgressCoordinatorFailsSimple(true, null, GET, PUT);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
index fe450d1..6c6b8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
@@ -173,7 +173,7 @@ public abstract class CacheMvccAbstractFeatureTest extends CacheMvccAbstractTest
             int idx;
 
             do {
-                idx = (int) (Math.random() * 100) + 1;
+                idx = (int) (Math.random() * 100);
             }
             while (!keys.add(idx));
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index a4962d1..c191849 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -22,10 +22,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -45,6 +48,7 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
@@ -92,11 +96,9 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
@@ -223,18 +225,21 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        persistence = false;
+
         try {
-            verifyOldVersionsCleaned();
+            if(disableScheduledVacuum)
+                verifyOldVersionsCleaned();
 
             verifyCoordinatorInternalState();
         }
         finally {
             stopAllGrids();
-        }
 
-        MvccProcessorImpl.coordinatorAssignClosure(null);
+            MvccProcessorImpl.coordinatorAssignClosure(null);
 
-        cleanPersistenceDir();
+            cleanPersistenceDir();
+        }
 
         super.afterTest();
     }
@@ -420,7 +425,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                             Integer id1 = Math.min(i1, i2);
                             Integer id2 = Math.max(i1, i2);
 
-                            TreeSet<Integer> keys = new TreeSet<>();
+                            Set<Integer> keys = new HashSet<>();
 
                             keys.add(id1);
                             keys.add(id2);
@@ -787,7 +792,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
      * @param cache Cache.
      * @return All accounts
      */
-    private static Map<Integer, MvccTestAccount> getAllSql(TestCache<Integer, MvccTestAccount> cache) {
+    protected static Map<Integer, MvccTestAccount> getAllSql(TestCache<Integer, MvccTestAccount> cache) {
         Map<Integer, MvccTestAccount> accounts = new HashMap<>();
 
         SqlFieldsQuery qry = new SqlFieldsQuery("select _key, val, updateCnt from MvccTestAccount");
@@ -826,12 +831,28 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
      * @param cache Cache.
      * @param key Key.
      */
-    private static void removeSql(TestCache<Integer, MvccTestAccount> cache, Integer key) {
+    protected static void removeSql(TestCache<Integer, MvccTestAccount> cache, Integer key) {
         SqlFieldsQuery qry = new SqlFieldsQuery("delete from MvccTestAccount where _key=" + key);
 
         cache.cache.query(qry).getAll();
     }
 
+
+    /**
+     * Merge account by means of SQL API.
+     *
+     * @param cache Cache.
+     * @param key Key.
+     * @param val Value.
+     * @param updateCnt Update counter.
+     */
+    protected static void mergeSql(TestCache<Integer, MvccTestAccount> cache, Integer key, Integer val, Integer updateCnt) {
+        SqlFieldsQuery qry = new SqlFieldsQuery("merge into MvccTestAccount(_key, val, updateCnt) values " +
+            " (" + key+ ", " + val + ", " + updateCnt + ")");
+
+        cache.cache.query(qry).getAll();
+    }
+
     /**
      * Inserts account by means of SQL API.
      *
@@ -867,9 +888,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
         ReadMode readMode,
         WriteMode writeMode
     ) throws Exception {
-        if(readMode == SCAN && writeMode == PUT)
-            fail("https://issues.apache.org/jira/browse/IGNITE-7764");
-
         final int RANGE = 20;
 
         final int writers = 4;
@@ -886,15 +904,23 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                     info("Thread range [min=" + min + ", max=" + max + ']');
 
-                    Map<Integer, Integer> map = new HashMap<>();
+                    // Sorted map for put to avoid deadlocks.
+                    Map<Integer, Integer> map = new TreeMap<>();
+
+                    // Unordered key sequence.
+                    Set<Integer> keys = new LinkedHashSet<>();
 
                     int v = idx * 1_000_000;
 
                     boolean first = true;
 
                     while (!stop.get()) {
-                        while (map.size() < RANGE)
-                            map.put(rnd.nextInt(min, max), v);
+                        while (keys.size() < RANGE) {
+                            int key = rnd.nextInt(min, max);
+
+                            if (keys.add(key))
+                                map.put(key, v);
+                        }
 
                         TestCache<Integer, Integer> cache = randomCache(caches, rnd);
 
@@ -903,9 +929,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                             try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
                                 if (!first && rnd.nextBoolean()) {
-                                    Map<Integer, Integer> res = readAllByMode(cache.cache, map.keySet(), readMode, INTEGER_CODEC);
+                                    Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
 
-                                    for (Integer k : map.keySet())
+                                    for (Integer k : keys)
                                         assertEquals("res=" + res, v - 1, (Object)res.get(k));
                                 }
 
@@ -917,14 +943,12 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                             }
 
                             if (rnd.nextBoolean()) {
-                                Map<Integer, Integer> res = readAllByMode(cache.cache, map.keySet(), readMode, INTEGER_CODEC);
+                                Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
 
-                                for (Integer k : map.keySet())
+                                for (Integer k : keys)
                                     assertEquals("key=" + k, v, (Object)res.get(k));
                             }
 
-                            map.clear();
-
                             v++;
                         }
                         catch (Exception e) {
@@ -933,6 +957,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                         finally {
                             cache.readUnlock();
 
+                            keys.clear();
+
                             map.clear();
                         }
                     }
@@ -956,6 +982,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                         int min = range * RANGE;
                         int max = min + RANGE;
 
+                        keys.clear();
+
                         while (keys.size() < RANGE)
                             keys.add(rnd.nextInt(min, max));
 
@@ -1003,8 +1031,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                                 }
                             }
                         }
-
-                        keys.clear();
                     }
                 }
             };
@@ -1054,9 +1080,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
     )
         throws Exception
     {
-        if(readMode == SCAN && writeMode == PUT)
-            fail("https://issues.apache.org/jira/browse/IGNITE-7764");
-
         final int TOTAL = 20;
 
         assert N <= TOTAL;
@@ -1071,7 +1094,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
             @Override public void apply(IgniteCache<Object, Object> cache) {
                 final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
 
-                Map<Integer, Integer> vals = new HashMap<>();
+                Map<Integer, Integer> vals = new LinkedHashMap<>();
 
                 for (int i = 0; i < TOTAL; i++)
                     vals.put(i, N);
@@ -1341,6 +1364,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
             while (System.currentTimeMillis() < stopTime && !stop.get()) {
                 Thread.sleep(1000);
 
+                if (System.currentTimeMillis() >= stopTime || stop.get())
+                    break;
+
                 if (restartMode != null) {
                     switch (restartMode) {
                         case RESTART_CRD: {
@@ -1806,12 +1832,15 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                     }
                 });
 
+                Map res;
 
-                Map res = (Map)cache.query(scanQry).getAll()
-                    .stream()
-                    .collect(Collectors.toMap(v -> ((IgniteBiTuple)v).getKey(), v -> ((IgniteBiTuple)v).getValue()));
+                try (QueryCursor qry = cache.query(scanQry)) {
+                    res = (Map)qry.getAll()
+                        .stream()
+                        .collect(Collectors.toMap(v -> ((IgniteBiTuple)v).getKey(), v -> ((IgniteBiTuple)v).getValue()));
 
-                assertTrue("res.size()=" + res.size() + ", keys.size()=" + keys.size(), res.size() <= keys.size());
+                    assertTrue("res.size()=" + res.size() + ", keys.size()=" + keys.size(), res.size() <= keys.size());
+                }
 
                 return res;
 
@@ -1833,29 +1862,29 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                 String qry = b.toString();
 
-                SqlFieldsQuery sqlFieldsQry =  new SqlFieldsQuery(qry);
+                SqlFieldsQuery sqlFieldsQry = new SqlFieldsQuery(qry);
 
                 if (emulateLongQry)
                     sqlFieldsQry.setLazy(true).setPageSize(1);
 
                 List<List> rows;
 
-                if (emulateLongQry) {
-                    FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry);
-
-                    rows = new ArrayList<>();
+                try (FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry)) {
+                    if (emulateLongQry) {
+                        rows = new ArrayList<>();
 
-                    for (List row : cur) {
-                        rows.add(row);
+                        for (List row : cur) {
+                            rows.add(row);
 
-                        doSleep(ThreadLocalRandom.current().nextInt(50));
+                            doSleep(ThreadLocalRandom.current().nextInt(50));
+                        }
                     }
+                    else
+                        rows = cur.getAll();
                 }
-                else
-                    rows = cache.query(sqlFieldsQry).getAll();
 
                 if (rows.isEmpty())
-                    return Collections.EMPTY_MAP;
+                    return Collections.emptyMap();
 
                 res = new HashMap();
 
@@ -1887,7 +1916,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                 rows = cur.getAll();
 
                 if (rows.isEmpty())
-                    return Collections.EMPTY_MAP;
+                    return Collections.emptyMap();
 
                 res = new HashMap();
 
@@ -2104,6 +2133,23 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            MvccTestAccount account = (MvccTestAccount)o;
+            return val == account.val &&
+                updateCnt == account.updateCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+
+            return Objects.hash(val, updateCnt);
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return "MvccTestAccount{" +
                 "val=" + val +