You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/04 16:03:22 UTC

[14/50] [abbrv] ignite git commit: IGNITE-7764: MVCC: cache API support. This closes #4725.

IGNITE-7764: MVCC: cache API support. This closes #4725.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f7f834bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f7f834bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f7f834bf

Branch: refs/heads/ignite-5797
Commit: f7f834bfaf8c4170ab852e829554c8ab5b373b77
Parents: 6f39115
Author: AMRepo <an...@gmail.com>
Authored: Fri Sep 28 15:57:24 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Sep 28 15:57:24 2018 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   6 +-
 .../communication/GridIoMessageFactory.java     |  14 +-
 .../processors/cache/GridCacheAdapter.java      |  29 +-
 .../processors/cache/GridCacheEntryEx.java      |  12 +-
 .../processors/cache/GridCacheMapEntry.java     | 164 +++--
 .../cache/GridCacheUpdateTxResult.java          |  23 +-
 .../cache/IgniteCacheOffheapManager.java        |  24 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  48 +-
 .../processors/cache/IgniteCacheProxyImpl.java  |   2 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   | 134 +++-
 .../dht/GridDhtTxAbstractEnlistFuture.java      |  72 +-
 .../distributed/dht/GridDhtTxEnlistFuture.java  | 147 ++++
 .../dht/GridDhtTxQueryAbstractEnlistFuture.java |  83 +++
 .../dht/GridDhtTxQueryEnlistFuture.java         |  23 +-
 .../dht/GridDhtTxQueryResultsEnlistFuture.java  |  37 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   4 +
 .../dht/GridPartitionedSingleGetFuture.java     |   3 +-
 .../distributed/dht/NearTxResultHandler.java    | 128 ++++
 .../dht/colocated/GridDhtColocatedCache.java    | 138 ++--
 .../GridNearPessimisticTxPrepareFuture.java     |   9 +-
 .../near/GridNearTxAbstractEnlistFuture.java    |  20 +-
 .../near/GridNearTxEnlistFuture.java            | 683 +++++++++++++++++++
 .../near/GridNearTxEnlistRequest.java           | 642 +++++++++++++++++
 .../near/GridNearTxEnlistResponse.java          | 372 ++++++++++
 .../cache/distributed/near/GridNearTxLocal.java | 416 +++++++++--
 .../GridNearTxQueryAbstractEnlistFuture.java    |  36 +
 .../near/GridNearTxQueryEnlistFuture.java       |   4 +-
 .../near/GridNearTxQueryEnlistResponse.java     |   3 +-
 .../GridNearTxQueryResultsEnlistFuture.java     |   5 +-
 .../GridNearTxQueryResultsEnlistResponse.java   |   2 +-
 .../cache/mvcc/MvccProcessorImpl.java           |   3 +-
 .../cache/mvcc/MvccQueryTrackerImpl.java        |  10 +-
 .../persistence/GridCacheOffheapManager.java    |  15 +-
 .../transactions/IgniteTxLocalAdapter.java      |   5 +-
 .../cache/transactions/IgniteTxManager.java     |   6 +-
 .../cache/tree/mvcc/data/MvccUpdateDataRow.java |  88 ++-
 .../cache/tree/mvcc/data/ResultType.java        |   4 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   7 +-
 .../IgniteCacheTxIteratorSelfTest.java          |  10 +
 ...vccAbstractBasicCoordinatorFailoverTest.java |  25 +-
 ...acheMvccAbstractCoordinatorFailoverTest.java |  21 -
 .../mvcc/CacheMvccAbstractFeatureTest.java      |   2 +-
 .../cache/mvcc/CacheMvccAbstractTest.java       | 132 ++--
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 596 +++++++---------
 .../DataStreamProcessorMvccSelfTest.java        |   5 +
 .../configvariations/ConfigVariations.java      |   1 -
 .../query/h2/DhtResultSetEnlistFuture.java      |   4 +-
 .../query/h2/NearResultSetEnlistFuture.java     |   3 -
 ...sactionsCommandsWithMvccEnabledSelfTest.java |  78 +--
 ...cheMvccSelectForUpdateQueryAbstractTest.java |   2 +
 .../mvcc/CacheMvccSqlQueriesAbstractTest.java   |   4 +
 .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java |  15 +-
 .../mvcc/MvccRepeatableReadBulkOpsTest.java     | 441 ++++++++++++
 .../mvcc/MvccRepeatableReadOperationsTest.java  | 276 ++++++++
 .../testsuites/IgniteCacheMvccSqlTestSuite.java |   6 +
 .../ApiParity/IgniteConfigurationParityTest.cs  |   5 +-
 56 files changed, 4168 insertions(+), 879 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 9512bae..bcb9ef4 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -38,12 +38,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.GridCodegenConverter;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -168,7 +170,7 @@ public class MessageCodeGenerator {
 
 //        gen.generateAll(true);
 
-//        gen.generateAndWrite(GridNearTxQueryResultsEnlistRequest.class);
+        gen.generateAndWrite(GridNearTxEnlistResponse.class);
 
 //        gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 41c75be..389d8c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -110,6 +110,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
@@ -1066,7 +1068,17 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124..129] [-23..-27] [-36..-55]- this
+            case 159:
+                msg = new GridNearTxEnlistRequest();
+
+                break;
+
+            case 160:
+                msg = new GridNearTxEnlistResponse();
+
+                break;
+
+                // [-3..119] [124..129] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             // [2048..2053] - Snapshots

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 476b083..cf9337b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -154,7 +154,9 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  * Adapter for different cache implementations.
@@ -1940,6 +1942,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         }
 
         if (tx == null || tx.implicit()) {
+            assert !ctx.mvccEnabled() || mvccSnapshot != null;
+
             Map<KeyCacheObject, EntryGetResult> misses = null;
 
             Set<GridCacheEntryEx> newLocalEntries = null;
@@ -1978,7 +1982,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                             boolean skipEntry = readNoEntry;
 
                             if (readNoEntry) {
-                                CacheDataRow row = mvccSnapshot != null ? ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
+                                CacheDataRow row = mvccSnapshot != null ?
+                                    ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
                                     ctx.offheap().read(ctx, key);
 
                                 if (row != null) {
@@ -3411,7 +3416,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(keys);
 
-        //TODO IGNITE-7764
+        //TODO: IGNITE-9324: add explicit locks support.
         MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
 
         IgniteInternalFuture<Boolean> fut = lockAllAsync(keys, timeout);
@@ -3442,7 +3447,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKey(key);
 
-        //TODO IGNITE-7764
+        //TODO: IGNITE-9324: add explicit locks support.
         MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
 
         return lockAllAsync(Collections.singletonList(key), timeout);
@@ -4213,11 +4218,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     true,
                     op.single(),
                     ctx.systemTx() ? ctx : null,
-                    OPTIMISTIC,
-                    READ_COMMITTED,
+                    ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC,
+                    ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED,
                     tCfg.getDefaultTxTimeout(),
                     !ctx.skipStore(),
-                    false,
+                    ctx.mvccEnabled(),
                     0,
                     null
                 );
@@ -4315,11 +4320,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     true,
                     op.single(),
                     ctx.systemTx() ? ctx : null,
-                    OPTIMISTIC,
-                    READ_COMMITTED,
+                    ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC,
+                    ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED,
                     txCfg.getDefaultTxTimeout(),
                     !skipStore,
-                    false,
+                    ctx.mvccEnabled(),
                     0,
                     null);
 
@@ -4996,11 +5001,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 true,
                 op.single(),
                 ctx.systemTx() ? ctx : null,
-                OPTIMISTIC,
-                READ_COMMITTED,
+                ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC,
+                ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED,
                 CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(),
                 opCtx == null || !opCtx.skipStore(),
-                false,
+                ctx.mvccEnabled(),
                 0,
                 null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 18fa820..2e96a9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -352,6 +352,8 @@ public interface GridCacheEntryEx {
      * @param op Cache operation.
      * @param needHistory Whether to collect rows created or affected by the current tx.
      * @param noCreate Entry should not be created when enabled, e.g. SQL INSERT.
+     * @param filter Filter.
+     * @param retVal Previous value return flag.
      * @return Tuple containing success flag and old value. If success is {@code false},
      *      then value is {@code null}.
      * @throws IgniteCheckedException If storing value failed.
@@ -366,7 +368,9 @@ public interface GridCacheEntryEx {
         MvccSnapshot mvccVer,
         GridCacheOperation op,
         boolean needHistory,
-        boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException;
+        boolean noCreate,
+        @Nullable CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
      * @param tx Cache transaction.
@@ -374,6 +378,8 @@ public interface GridCacheEntryEx {
      * @param topVer Topology version.
      * @param mvccVer Mvcc version.
      * @param needHistory Whether to collect rows created or affected by the current tx.
+     * @param filter Filter.
+     * @param retVal Previous value return flag.
      * @return Tuple containing success flag and old value. If success is {@code false},
      *      then value is {@code null}.
      * @throws IgniteCheckedException If storing value failed.
@@ -384,7 +390,9 @@ public interface GridCacheEntryEx {
         UUID affNodeId,
         AffinityTopologyVersion topVer,
         MvccSnapshot mvccVer,
-        boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException;
+        boolean needHistory,
+        @Nullable CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
      * @param tx Transaction adapter.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 8fe559d..f58a3dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRow;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -1046,7 +1047,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         MvccSnapshot mvccVer,
         GridCacheOperation op,
         boolean needHistory,
-        boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        boolean noCreate,
+        CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert tx != null;
 
         final boolean valid = valid(tx.topologyVersion());
@@ -1087,7 +1090,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             assert val != null;
 
             res = cctx.offheap().mvccUpdate(
-                this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate);
+                this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, retVal);
 
             assert res != null;
 
@@ -1100,7 +1103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             if (res.resultType() == ResultType.VERSION_MISMATCH)
                 throw new IgniteSQLException("Mvcc version mismatch.", CONCURRENT_UPDATE);
-            else if (noCreate && res.resultType() == ResultType.PREV_NULL)
+            else if (res.resultType() == ResultType.FILTERED || (noCreate && res.resultType() == ResultType.PREV_NULL))
                 return new GridCacheUpdateTxResult(false);
             else if (res.resultType() == ResultType.LOCKED) {
                 unlockEntry();
@@ -1112,7 +1115,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
 
                 lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer,
-                    op, needHistory, noCreate, resFut));
+                    op, needHistory, noCreate, filter, retVal, resFut));
 
                 return new GridCacheUpdateTxResult(false, resFut);
             }
@@ -1141,17 +1144,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 counters.incrementUpdateCounter(cctx.cacheId(), partition());
             }
 
-            if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
+            if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) {
                 logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
-                        cctx.cacheId(),
-                        key,
-                        val,
-                        res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
-                        tx.nearXidVersion(),
-                        newVer,
-                        expireTime,
-                        key.partition(),
-                        0L)));
+                    cctx.cacheId(),
+                    key,
+                    val,
+                    res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
+                    tx.nearXidVersion(),
+                    newVer,
+                    expireTime,
+                    key.partition(),
+                    0L)));
+            }
 
             update(val, expireTime, ttl, newVer, true);
 
@@ -1172,6 +1176,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) :
             new GridCacheUpdateTxResult(false, logPtr);
 
+        CacheDataRow oldRow = ((MvccUpdateDataRow)res).oldRow();
+
+        if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) {
+            assert oldRow != null;
+
+            updRes.prevValue(oldRow.value());
+        }
+
         updRes.mvccHistory(res.history());
 
         return updRes;
@@ -1183,7 +1195,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         UUID affNodeId,
         AffinityTopologyVersion topVer,
         MvccSnapshot mvccVer,
-        boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        boolean needHistory,
+        @Nullable CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert tx != null;
         assert mvccVer != null;
 
@@ -1204,13 +1218,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             assert newVer != null : "Failed to get write version for tx: " + tx;
 
-            res = cctx.offheap().mvccRemove(this, mvccVer, tx.local(), needHistory);
+            res = cctx.offheap().mvccRemove(this, mvccVer, tx.local(), needHistory, filter, retVal);
 
             assert res != null;
 
             if (res.resultType() == ResultType.VERSION_MISMATCH)
                 throw new IgniteSQLException("Mvcc version mismatch.", CONCURRENT_UPDATE);
-            else if (res.resultType() == ResultType.PREV_NULL)
+            else if (res.resultType() == ResultType.PREV_NULL || res.resultType() == ResultType.FILTERED)
                 return new GridCacheUpdateTxResult(false);
             else if (res.resultType() == ResultType.LOCKED) {
                 unlockEntry();
@@ -1222,7 +1236,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
 
                 lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory,
-                    resFut));
+                    resFut, retVal, filter));
 
                 return new GridCacheUpdateTxResult(false, resFut);
             }
@@ -1265,6 +1279,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) :
             new GridCacheUpdateTxResult(false, logPtr);
 
+        CacheDataRow oldRow = ((MvccUpdateDataRow)res).oldRow();
+
+        if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) {
+            assert oldRow != null;
+
+            updRes.prevValue(oldRow.value());
+        }
+
         updRes.mvccHistory(res.history());
 
         return updRes;
@@ -2264,12 +2286,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (updateMetrics &&
                 updateRes.outcome().updateReadMetrics() &&
                 needVal)
-                    cctx.cache().metrics0().onRead(oldVal != null);
+                cctx.cache().metrics0().onRead(oldVal != null);
 
             if (updateMetrics && INVOKE_NO_OP.equals(updateRes.outcome()) && (transformOp || updateRes.transformed()))
                 cctx.cache().metrics0().onReadOnlyInvoke(oldVal != null);
             else if (updateMetrics && REMOVE_NO_VAL.equals(updateRes.outcome())
-                    && (transformOp || updateRes.transformed()))
+                && (transformOp || updateRes.transformed()))
                 cctx.cache().metrics0().onInvokeRemove(oldVal != null);
 
             switch (updateRes.outcome()) {
@@ -3521,11 +3543,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     // Detach value before index update.
                     val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
-                if (val != null) {
-                    if (cctx.mvccEnabled())
-                        cctx.offheap().mvccInitialValue(this, val, newVer, expTime);
-                    else
-                        storeValue(val, expTime, newVer);
+                    if (val != null) {
+                        if (cctx.mvccEnabled())
+                            cctx.offheap().mvccInitialValue(this, val, newVer, expTime);
+                        else
+                            storeValue(val, expTime, newVer);
 
                         if (deletedUnlocked())
                             deletedUnlocked(false);
@@ -4157,12 +4179,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /**
-     * Stores value inoffheap.*
+     * Stores value in off-heap.
+     *
      * @param val Value.
      * @param expireTime Expire time.
      * @param ver New entry version.
      * @param predicate Optional predicate.
-     *
      * @return {@code True} if storage was modified.
      * @throws IgniteCheckedException If update failed.
      */
@@ -4299,7 +4321,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (detached())
                 return rawGet();
 
-            for (;;) {
+            for (; ; ) {
                 GridCacheEntryEx e = cctx.cache().peekEx(key);
 
                 if (e == null)
@@ -4806,7 +4828,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 if (transformed)
                     cctx.cache().metrics0().onInvokeRemove(hasOldVal);
-            } else if (op == READ && transformed)
+            }
+            else if (op == READ && transformed)
                 cctx.cache().metrics0().onReadOnlyInvoke(hasOldVal);
             else {
                 cctx.cache().metrics0().onWrite();
@@ -4940,6 +4963,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         /** */
         private final GridFutureAdapter<GridCacheUpdateTxResult> resFut;
 
+        /** Need previous value flag. */
+        private final boolean needVal;
+
+        /** Filter. */
+        private final CacheEntryPredicate filter;
+
         /** */
         private GridCacheMapEntry entry;
 
@@ -4950,7 +4979,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             AffinityTopologyVersion topVer,
             MvccSnapshot mvccVer,
             boolean needHistory,
-            GridFutureAdapter<GridCacheUpdateTxResult> resFut) {
+            GridFutureAdapter<GridCacheUpdateTxResult> resFut,
+            boolean retVal,
+            @Nullable CacheEntryPredicate filter) {
             this.tx = tx;
             this.entry = entry;
             this.topVer = topVer;
@@ -4958,6 +4989,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             this.mvccVer = mvccVer;
             this.needHistory = needHistory;
             this.resFut = resFut;
+            this.needVal = retVal;
+            this.filter = filter;
         }
 
         /** {@inheritDoc} */
@@ -4989,8 +5022,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 cctx.shared().database().checkpointReadLock();
 
                 try {
-                    res = cctx.offheap().mvccRemove(entry, mvccVer, tx.local(), needHistory);
-                } finally {
+                    res = cctx.offheap().mvccRemove(entry, mvccVer, tx.local(), needHistory, filter, needVal);
+                }
+                finally {
                     cctx.shared().database().checkpointReadUnlock();
                 }
 
@@ -5001,7 +5035,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     return;
                 }
-                else if (res.resultType() == ResultType.PREV_NULL) {
+                else if (res.resultType() == ResultType.PREV_NULL || res.resultType() == ResultType.FILTERED) {
                     resFut.onDone(new GridCacheUpdateTxResult(false));
 
                     return;
@@ -5034,15 +5068,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
                     logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
-                            cctx.cacheId(),
-                            entry.key(),
-                            null,
-                            DELETE,
-                            tx.nearXidVersion(),
-                            tx.writeVersion(),
-                            0,
-                            entry.key().partition(),
-                            0)));
+                        cctx.cacheId(),
+                        entry.key(),
+                        null,
+                        DELETE,
+                        tx.nearXidVersion(),
+                        tx.writeVersion(),
+                        0,
+                        entry.key().partition(),
+                        0)));
 
                 entry.update(null, 0, 0, newVer, true);
 
@@ -5209,6 +5243,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         /** */
         private final boolean noCreate;
 
+        /** Filter. */
+        private final CacheEntryPredicate filter;
+
+        /** Need previous value flag.*/
+        private final boolean needVal;
+
         /** */
         MvccUpdateLockListener(IgniteInternalTx tx,
             GridCacheMapEntry entry,
@@ -5220,6 +5260,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             GridCacheOperation op,
             boolean needHistory,
             boolean noCreate,
+            CacheEntryPredicate filter,
+            boolean needVal,
             GridFutureAdapter<GridCacheUpdateTxResult> resFut) {
             this.tx = tx;
             this.entry = entry;
@@ -5231,6 +5273,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             this.op = op;
             this.needHistory = needHistory;
             this.noCreate = noCreate;
+            this.filter = filter;
+            this.needVal = needVal;
             this.resFut = resFut;
         }
 
@@ -5279,8 +5323,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 try {
                     res = cctx.offheap().mvccUpdate(
-                        entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate);
-                } finally {
+                        entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, needVal);
+                }
+                finally {
                     cctx.shared().database().checkpointReadUnlock();
                 }
 
@@ -5329,15 +5374,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
                     logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
-                            cctx.cacheId(),
-                            entry.key(),
-                            val,
-                            res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
-                            tx.nearXidVersion(),
-                            newVer,
-                            expireTime,
-                            entry.key().partition(),
-                            0L)));
+                        cctx.cacheId(),
+                        entry.key(),
+                        val,
+                        res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
+                        tx.nearXidVersion(),
+                        newVer,
+                        expireTime,
+                        entry.key().partition(),
+                        0L)));
 
                 entry.update(val, expireTime, ttl, newVer, true);
 
@@ -6007,8 +6052,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             @Nullable IgniteBiTuple<Object, Exception> invokeRes,
             boolean readFromStore,
             boolean transformed)
-            throws IgniteCheckedException
-        {
+            throws IgniteCheckedException {
             GridCacheContext cctx = entry.context();
 
             final CacheObject oldVal = entry.val;
@@ -6059,7 +6103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
             else {
-               newSysTtl = newTtl = conflictCtx.ttl();
+                newSysTtl = newTtl = conflictCtx.ttl();
                 newSysExpireTime = newExpireTime = conflictCtx.expireTime();
             }
 
@@ -6166,8 +6210,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             @Nullable IgniteBiTuple<Object, Exception> invokeRes,
             boolean readFromStore,
             boolean transformed)
-            throws IgniteCheckedException
-        {
+            throws IgniteCheckedException {
             GridCacheContext cctx = entry.context();
 
             CacheObject oldVal = entry.val;
@@ -6264,8 +6307,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private GridCacheVersionConflictContext<?, ?> resolveConflict(
             CacheObject newVal,
             @Nullable IgniteBiTuple<Object, Exception> invokeRes)
-            throws IgniteCheckedException
-        {
+            throws IgniteCheckedException {
             GridCacheContext cctx = entry.context();
 
             // Cache is conflict-enabled.
@@ -6437,7 +6479,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
             catch (Exception e) {
                 if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
-                    throw (IgniteException) e;
+                    throw (IgniteException)e;
 
                 writeObj = invokeEntry.valObj;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index b646cf9..4543dfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -40,14 +40,17 @@ public class GridCacheUpdateTxResult {
     private GridLongList mvccWaitTxs;
 
     /** */
-    private  GridFutureAdapter<GridCacheUpdateTxResult> fut;
+    private GridFutureAdapter<GridCacheUpdateTxResult> fut;
 
     /** */
     private WALPointer logPtr;
 
-    /** */
+    /** Mvcc history. */
     private List<MvccLinkAwareSearchRow> mvccHistory;
 
+    /** Previous value. */
+    private CacheObject prevVal;
+
     /**
      * Constructor.
      *
@@ -158,6 +161,22 @@ public class GridCacheUpdateTxResult {
         this.mvccHistory = mvccHistory;
     }
 
+    /**
+     *
+     * @return Previous value.
+     */
+    @Nullable  public CacheObject prevValue() {
+        return prevVal;
+    }
+
+    /**
+     *
+     * @param prevVal Previous value.
+     */
+    public void prevValue( @Nullable  CacheObject prevVal) {
+        this.prevVal = prevVal;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheUpdateTxResult.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index b4b6c9b..f576cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -276,6 +276,8 @@ public interface IgniteCacheOffheapManager {
      * @param primary {@code True} if on primary node.
      * @param needHistory Flag to collect history.
      * @param noCreate Flag indicating that row should not be created if absent.
+     * @param filter Filter.
+     * @param retVal Flag to return previous value.
      * @return Update result.
      * @throws IgniteCheckedException If failed.
      */
@@ -287,13 +289,17 @@ public interface IgniteCacheOffheapManager {
         MvccSnapshot mvccSnapshot,
         boolean primary,
         boolean needHistory,
-        boolean noCreate) throws IgniteCheckedException;
+        boolean noCreate,
+        @Nullable CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException;
 
     /**
      * @param entry Entry.
      * @param mvccSnapshot MVCC snapshot.
      * @param primary {@code True} if on primary node.
      * @param needHistory Flag to collect history.
+     * @param filter Filter.
+     * @param retVal Flag to return previous value.
      * @return Update result.
      * @throws IgniteCheckedException If failed.
      */
@@ -301,7 +307,9 @@ public interface IgniteCacheOffheapManager {
         GridCacheMapEntry entry,
         MvccSnapshot mvccSnapshot,
         boolean primary,
-        boolean needHistory) throws IgniteCheckedException;
+        boolean needHistory,
+        @Nullable CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException;
 
     /**
      * @param entry Entry.
@@ -788,9 +796,11 @@ public interface IgniteCacheOffheapManager {
          * @param ver Version.
          * @param expireTime Expire time.
          * @param mvccSnapshot MVCC snapshot.
+         * @param filter Filter.
          * @param primary {@code True} if update is executed on primary node.
          * @param needHistory Flag to collect history.
          * @param noCreate Flag indicating that row should not be created if absent.
+         * @param retVal Flag to return previous value.
          * @return Update result.
          * @throws IgniteCheckedException If failed.
          */
@@ -801,16 +811,20 @@ public interface IgniteCacheOffheapManager {
             GridCacheVersion ver,
             long expireTime,
             MvccSnapshot mvccSnapshot,
+            @Nullable CacheEntryPredicate filter,
             boolean primary,
             boolean needHistory,
-            boolean noCreate) throws IgniteCheckedException;
+            boolean noCreate,
+            boolean retVal) throws IgniteCheckedException;
 
         /**
          * @param cctx Cache context.
          * @param key Key.
          * @param mvccSnapshot MVCC snapshot.
+         * @param filter Filter.
          * @param primary {@code True} if update is executed on primary node.
          * @param needHistory Flag to collect history.
+         * @param retVal Flag to return previous value.
          * @return List of transactions to wait for.
          * @throws IgniteCheckedException If failed.
          */
@@ -818,8 +832,10 @@ public interface IgniteCacheOffheapManager {
             GridCacheContext cctx,
             KeyCacheObject key,
             MvccSnapshot mvccSnapshot,
+            @Nullable CacheEntryPredicate filter,
             boolean primary,
-            boolean needHistory) throws IgniteCheckedException;
+            boolean needHistory,
+            boolean retVal) throws IgniteCheckedException;
 
         /**
          * @param cctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 11e67d3..e0b9c06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -514,7 +514,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         MvccSnapshot mvccSnapshot,
         boolean primary,
         boolean needHistory,
-        boolean noCreate) throws IgniteCheckedException {
+        boolean noCreate,
+        @Nullable CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException {
         if (entry.detached() || entry.isNear())
             return null;
 
@@ -526,9 +528,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             ver,
             expireTime,
             mvccSnapshot,
+            filter,
             primary,
             needHistory,
-            noCreate);
+            noCreate,
+            retVal);
     }
 
     /** {@inheritDoc} */
@@ -536,7 +540,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         GridCacheMapEntry entry,
         MvccSnapshot mvccSnapshot,
         boolean primary,
-        boolean needHistory) throws IgniteCheckedException {
+        boolean needHistory,
+        @Nullable CacheEntryPredicate filter,
+        boolean retVal) throws IgniteCheckedException {
         if (entry.detached() || entry.isNear())
             return null;
 
@@ -545,8 +551,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         return dataStore(entry.localPartition()).mvccRemove(entry.context(),
             entry.key(),
             mvccSnapshot,
+            filter,
             primary,
-            needHistory);
+            needHistory,
+            retVal);
     }
 
     /** {@inheritDoc} */
@@ -1848,9 +1856,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             GridCacheVersion ver,
             long expireTime,
             MvccSnapshot mvccSnapshot,
+            @Nullable CacheEntryPredicate filter,
             boolean primary,
             boolean needHistory,
-            boolean noCreate) throws IgniteCheckedException {
+            boolean noCreate,
+            boolean retVal) throws IgniteCheckedException {
             assert mvccSnapshot != null;
             assert primary || !needHistory;
 
@@ -1866,7 +1876,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 key.valueBytes(coCtx);
                 val.valueBytes(coCtx);
 
-                MvccUpdateDataRow updateRow = new MvccUpdateDataRow(
+                 MvccUpdateDataRow updateRow = new MvccUpdateDataRow(
                     cctx,
                     key,
                     val,
@@ -1875,11 +1885,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     expireTime,
                     mvccSnapshot,
                     null,
+                    filter,
                     primary,
                     false,
                     needHistory,
                     // we follow fast update visit flow here if row cannot be created by current operation
-                    noCreate);
+                    noCreate,
+                    retVal);
 
                 assert cctx.shared().database().checkpointLockIsHeldByThread();
 
@@ -1890,17 +1902,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 if (res == ResultType.LOCKED // cannot update locked
                     || res == ResultType.VERSION_MISMATCH) // cannot update on write conflict
                     return updateRow;
-                else if (res == ResultType.VERSION_FOUND) {
+                else if (res == ResultType.VERSION_FOUND || // exceptional case
+                        res == ResultType.FILTERED || // Operation should be skipped.
+                        (res == ResultType.PREV_NULL && noCreate)  // No op.
+                    ) {
                     // Do nothing, except cleaning up not needed versions
                     cleanup(cctx, updateRow.cleanupRows());
 
                     return updateRow;
                 }
-                else if (res == ResultType.PREV_NULL && noCreate) {
-                    cleanup(cctx, updateRow.cleanupRows());
-
-                    return updateRow;
-                }
 
                 CacheDataRow oldRow = null;
 
@@ -1961,8 +1971,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         @Override public MvccUpdateResult mvccRemove(GridCacheContext cctx,
             KeyCacheObject key,
             MvccSnapshot mvccSnapshot,
+            @Nullable CacheEntryPredicate filter,
             boolean primary,
-            boolean needHistory) throws IgniteCheckedException {
+            boolean needHistory,
+            boolean retVal) throws IgniteCheckedException {
             assert mvccSnapshot != null;
             assert primary || mvccSnapshot.activeTransactions().size() == 0 : mvccSnapshot;
             assert primary || !needHistory;
@@ -1987,10 +1999,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     0,
                     mvccSnapshot,
                     null,
+                    filter,
                     primary,
                     false,
                     needHistory,
-                    true);
+                    true,
+                    retVal);
 
                 assert cctx.shared().database().checkpointLockIsHeldByThread();
 
@@ -2001,7 +2015,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 if (res == ResultType.LOCKED // cannot update locked
                     || res == ResultType.VERSION_MISMATCH) // cannot update on write conflict
                     return updateRow;
-                else if (res == ResultType.VERSION_FOUND) {
+                else if (res == ResultType.VERSION_FOUND || res == ResultType.FILTERED) {
                     // Do nothing, except cleaning up not needed versions
                     cleanup(cctx, updateRow.cleanupRows());
 
@@ -2051,9 +2065,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     0,
                     mvccSnapshot,
                     null,
+                    null,
                     true,
                     true,
                     false,
+                    false,
                     false);
 
                 assert cctx.shared().database().checkpointLockIsHeldByThread();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 225fa81..4989efb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -348,7 +348,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public Lock lockAll(final Collection<? extends K> keys) {
-        //TODO IGNITE-7764
+        //TODO: IGNITE-9324: add explicit locks support.
         MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
 
         return new CacheLockImpl<>(ctx.gate(), delegate, ctx.operationContextPerCall(), keys);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 4480dae..52638c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -60,6 +60,9 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
@@ -188,7 +191,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
         ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryEnlistResponse.class, new CI2<UUID, GridNearTxQueryEnlistResponse>() {
             @Override public void apply(UUID nodeId, GridNearTxQueryEnlistResponse req) {
-                processNearEnlistResponse(nodeId, req);
+                processNearTxQueryEnlistResponse(nodeId, req);
             }
         });
 
@@ -216,7 +219,21 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryResultsEnlistResponse.class,
             new CI2<UUID, GridNearTxQueryResultsEnlistResponse>() {
                 @Override public void apply(UUID nodeId, GridNearTxQueryResultsEnlistResponse req) {
-                    processNearTxEnlistResponse(nodeId, req);
+                    processNearTxQueryResultsEnlistResponse(nodeId, req);
+                }
+            });
+
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistRequest.class,
+            new CI2<UUID, GridNearTxEnlistRequest>() {
+                @Override public void apply(UUID nodeId, GridNearTxEnlistRequest req) {
+                    processNearTxEnlistRequest(nodeId, req);
+                }
+            });
+
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistResponse.class,
+            new CI2<UUID, GridNearTxEnlistResponse>() {
+                @Override public void apply(UUID nodeId, GridNearTxEnlistResponse msg) {
+                    processNearTxEnlistResponse(nodeId, msg);
                 }
             });
 
@@ -756,17 +773,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
     /**
      * @param nodeId Node ID.
-     * @param res Response.
-     */
-    private void processNearEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) {
-        GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId());
-
-        if (fut != null)
-            fut.onResult(nodeId, res);
-    }
-
-    /**
-     * @param nodeId Node ID.
      * @param req Request.
      */
     private void processNearLockRequest(UUID nodeId, GridNearLockRequest req) {
@@ -801,7 +807,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         IgniteInternalFuture<?> f;
 
         if (req.firstClientRequest()) {
-            for (;;) {
+            for (; ; ) {
                 if (waitForExchangeFuture(nearNode, req))
                     return;
 
@@ -1079,9 +1085,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                 if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) {
                     if (log.isDebugEnabled()) {
                         log.debug("Client topology version mismatch, need remap lock request [" +
-                                "reqTopVer=" + req.topologyVersion() +
-                                ", locTopVer=" + top.readyTopologyVersion() +
-                                ", req=" + req + ']');
+                            "reqTopVer=" + req.topologyVersion() +
+                            ", locTopVer=" + top.readyTopologyVersion() +
+                            ", req=" + req + ']');
                     }
 
                     GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
@@ -1124,7 +1130,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                         if (tx == null || !tx.init()) {
                             String msg = "Failed to acquire lock (transaction has been completed): " +
-                                    req.version();
+                                req.version();
 
                             U.warn(log, msg);
 
@@ -1401,7 +1407,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                 int i = 0;
 
-                for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext();) {
+                for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext(); ) {
                     GridCacheEntryEx e = it.next();
 
                     assert e != null;
@@ -1995,6 +2001,71 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processNearTxEnlistRequest(UUID nodeId, final GridNearTxEnlistRequest req) {
+        assert nodeId != null;
+        assert req != null;
+
+        ClusterNode nearNode = ctx.discovery().node(nodeId);
+
+        GridDhtTxLocal tx;
+
+        try {
+            tx = initTxTopologyVersion(nodeId,
+                nearNode,
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                req.firstClientRequest(),
+                req.topologyVersion(),
+                req.threadId(),
+                req.txTimeout(),
+                req.subjectId(),
+                req.taskNameHash());
+        }
+        catch (IgniteCheckedException | IgniteException ex) {
+            GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(req.cacheId(),
+                req.futureId(),
+                req.miniId(),
+                req.version(),
+                ex);
+
+            try {
+                ctx.io().send(nearNode, res, ctx.ioPolicy());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send near enlist response [" +
+                    "txId=" + req.version() +
+                    ", node=" + nodeId +
+                    ", res=" + res + ']', e);
+            }
+
+            return;
+        }
+
+        GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(
+            nodeId,
+            req.version(),
+            req.mvccSnapshot(),
+            req.threadId(),
+            req.futureId(),
+            req.miniId(),
+            tx,
+            req.timeout(),
+            ctx,
+            req.rows(),
+            req.operation(),
+            req.filter(),
+            req.needRes());
+
+        fut.listen(NearTxResultHandler.instance());
+
+        fut.init();
+    }
+
+    /**
      * @param nodeId Near node id.
      * @param nearNode Near node.
      * @param nearLockVer Near lock version.
@@ -2125,7 +2196,30 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) {
+    private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxEnlistResponse res) {
+        GridNearTxEnlistFuture fut = (GridNearTxEnlistFuture)
+            ctx.mvcc().versionedFuture(res.version(), res.futureId());
+
+        if (fut != null)
+            fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processNearTxQueryEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) {
+        GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId());
+
+        if (fut != null)
+            fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processNearTxQueryResultsEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) {
         GridNearTxQueryResultsEnlistFuture fut = (GridNearTxQueryResultsEnlistFuture)
             ctx.mvcc().versionedFuture(res.version(), res.futureId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index ad164e7..64f966d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -77,11 +78,10 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Abstract future processing transaction enlisting and locking
- * of entries produced with DML and SELECT FOR UPDATE queries.
+ * Abstract future processing transaction enlisting and locking.
  */
-public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapter<Long>
-    implements DhtLockFuture<Long> {
+public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAdapter<T>
+    implements DhtLockFuture<T> {
     /** Done field updater. */
     private static final AtomicIntegerFieldUpdater<GridDhtTxAbstractEnlistFuture> DONE_UPD =
         AtomicIntegerFieldUpdater.newUpdater(GridDhtTxAbstractEnlistFuture.class, "done");
@@ -134,9 +134,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
     /** */
     protected final MvccSnapshot mvccSnapshot;
 
-    /** Processed entries count. */
-    protected long cnt;
-
     /** New DHT nodes. */
     protected Set<UUID> newDhtNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -146,6 +143,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
     /** Near lock version. */
     protected final GridCacheVersion nearLockVer;
 
+    /** Filter. */
+    private final CacheEntryPredicate filter;
+
     /** Timeout object. */
     @GridToStringExclude
     protected LockTimeoutObject timeoutObj;
@@ -202,6 +202,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
      * @param tx Transaction.
      * @param timeout Lock acquisition timeout.
      * @param cctx Cache context.
+     * @param filter Filter.
      */
     protected GridDhtTxAbstractEnlistFuture(UUID nearNodeId,
         GridCacheVersion nearLockVer,
@@ -212,7 +213,8 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
         @Nullable int[] parts,
         GridDhtTxLocalAdapter tx,
         long timeout,
-        GridCacheContext<?, ?> cctx) {
+        GridCacheContext<?, ?> cctx,
+        @Nullable CacheEntryPredicate filter) {
         assert tx != null;
         assert timeout >= 0;
         assert nearNodeId != null;
@@ -229,6 +231,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
         this.timeout = timeout;
         this.tx = tx;
         this.parts = parts;
+        this.filter = filter;
 
         lockVer = tx.xidVersion();
 
@@ -238,12 +241,38 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
     }
 
     /**
+     * Gets source to be updated iterator.
+     *
      * @return iterator.
      * @throws IgniteCheckedException If failed.
      */
     protected abstract UpdateSourceIterator<?> createIterator() throws IgniteCheckedException;
 
     /**
+     * Gets query result.
+     *
+     * @return Query result.
+     */
+    protected abstract T result0();
+
+    /**
+     * Gets need previous value flag.
+     *
+     * @return {@code True} if previous value is required.
+     */
+    public boolean needResult() {
+        return false;
+    }
+
+    /**
+     * Entry processed callback.
+     *
+     * @param key Entry key.
+     * @param res Update result.
+     */
+    protected abstract void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res);
+
+    /**
      *
      */
     public void init() {
@@ -291,14 +320,14 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
 
         boolean added = cctx.mvcc().addFuture(this, futId);
 
-        assert added;
-
         if (isDone()) {
             cctx.mvcc().removeFuture(futId);
 
             return;
         }
 
+        assert added;
+
         if (timeoutObj != null)
             cctx.time().addTimeoutObject(timeoutObj);
 
@@ -310,12 +339,15 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
             if (!it.hasNext()) {
                 U.close(it, log);
 
-                onDone(0L);
+                onDone(result0());
 
                 return;
             }
 
-            tx.addActiveCache(cctx, false);
+            if(!tx.implicitSingle())
+                tx.addActiveCache(cctx, false);
+            else // Nothing to do for single update.
+                assert tx.txState().cacheIds().contains(cctx.cacheId()) && tx.txState().cacheIds().size() == 1;
 
             this.it = it;
         }
@@ -391,7 +423,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                                         cctx.localNodeId(),
                                         topVer,
                                         mvccSnapshot,
-                                        isMoving(key.partition()));
+                                        isMoving(key.partition()),
+                                        filter,
+                                        needResult());
 
                                     break;
 
@@ -407,7 +441,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                                         mvccSnapshot,
                                         op.cacheOperation(),
                                         isMoving(key.partition()),
-                                        op.noCreate());
+                                        op.noCreate(),
+                                        filter,
+                                        needResult());
 
                                     break;
 
@@ -493,7 +529,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                     }
 
                     if (noPendingRequests()) {
-                        onDone(cnt);
+                        onDone(result0());
 
                         return;
                     }
@@ -569,11 +605,11 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
         if (ptr0 != null)
             walPtr = ptr0;
 
+        onEntryProcessed(entry.key(), updRes);
+
         if (!updRes.success())
             return;
 
-        cnt++;
-
         if (op != EnlistOperation.LOCK)
             addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId());
     }
@@ -980,7 +1016,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) {
+    @Override public boolean onDone(@Nullable T res, @Nullable Throwable err) {
         assert res != null || err != null;
 
         if (!DONE_UPD.compareAndSet(this, 0, 1))

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
new file mode 100644
index 0000000..58d6b15
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future processing transaction enlisting and locking of entries produces by cache API operations.
+ */
+public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<GridCacheReturn> implements UpdateSourceIterator<Object> {
+    /** Enlist operation. */
+    private EnlistOperation op;
+
+    /** Source iterator. */
+    private Iterator<Object> it;
+
+    /** Future result. */
+    private GridCacheReturn res;
+
+    /** Need result flag. If {@code True} previous value should be returned as well. */
+    private boolean needRes;
+
+    /**
+     * Constructor.
+     *
+     * @param nearNodeId Near node ID.
+     * @param nearLockVer Near lock version.
+     * @param mvccSnapshot Mvcc snapshot.
+     * @param threadId Thread ID.
+     * @param nearFutId Near future id.
+     * @param nearMiniId Near mini future id.
+     * @param tx Transaction.
+     * @param timeout Lock acquisition timeout.
+     * @param cctx Cache context.
+     * @param rows Collection of rows.
+     * @param op Operation.
+     * @param filter Filter.
+     * @param needRes Return previous value flag.
+     */
+    public GridDhtTxEnlistFuture(UUID nearNodeId,
+        GridCacheVersion nearLockVer,
+        MvccSnapshot mvccSnapshot,
+        long threadId,
+        IgniteUuid nearFutId,
+        int nearMiniId,
+        GridDhtTxLocalAdapter tx,
+        long timeout,
+        GridCacheContext<?, ?> cctx,
+        Collection<Object> rows,
+        EnlistOperation op,
+        @Nullable CacheEntryPredicate filter,
+        boolean needRes) {
+        super(nearNodeId,
+            nearLockVer,
+            mvccSnapshot,
+            threadId,
+            nearFutId,
+            nearMiniId,
+            null,
+            tx,
+            timeout,
+            cctx,
+            filter);
+
+        this.op = op;
+        this.needRes = needRes;
+
+        it = rows.iterator();
+
+        res = new GridCacheReturn(cctx.localNodeId().equals(nearNodeId), false);
+
+        skipNearNodeUpdates = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable protected GridCacheReturn result0() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult txRes) {
+        if (needRes && txRes.success())
+            res.set(cctx, txRes.prevValue(), txRes.success(), true);
+        else
+            res.success(txRes.success());
+    }
+
+    /** {@inheritDoc} */
+    public boolean needResult() {
+        return needRes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public EnlistOperation operation() {
+        return op;
+    }
+
+    /** {@inheritDoc} */
+    public boolean hasNextX() {
+        return it.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    public Object nextX() {
+        return it.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxEnlistFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
new file mode 100644
index 0000000..0a26d75
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract future processing transaction enlisting and locking of entries produced with DML and SELECT FOR UPDATE
+ * queries.
+ */
+public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstractEnlistFuture<Long> {
+    /** Processed entries count. */
+    protected long cnt;
+
+    /**
+     * Constructor.
+     *
+     * @param nearNodeId Near node ID.
+     * @param nearLockVer Near lock version.
+     * @param mvccSnapshot Mvcc snapshot.
+     * @param threadId Thread ID.
+     * @param nearFutId Near future id.
+     * @param nearMiniId Near mini future id.
+     * @param parts Partitions.
+     * @param tx Transaction.
+     * @param timeout Lock acquisition timeout.
+     * @param cctx Cache context.
+     */
+    protected GridDhtTxQueryAbstractEnlistFuture(UUID nearNodeId,
+        GridCacheVersion nearLockVer,
+        MvccSnapshot mvccSnapshot,
+        long threadId,
+        IgniteUuid nearFutId,
+        int nearMiniId,
+        @Nullable int[] parts,
+        GridDhtTxLocalAdapter tx,
+        long timeout,
+        GridCacheContext<?, ?> cctx) {
+        super(nearNodeId,
+            nearLockVer,
+            mvccSnapshot,
+            threadId,
+            nearFutId,
+            nearMiniId,
+            null,
+            tx,
+            timeout,
+            cctx, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Long result0() {
+        return cnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res) {
+        if(res.success())
+            cnt++;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
index dd30855..ed792f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -29,9 +28,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 
 /**
- * Cache lock future.
+ * Cache query lock future.
  */
-public final class GridDhtTxQueryEnlistFuture extends GridDhtTxAbstractEnlistFuture {
+public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture {
     /** Involved cache ids. */
     private final int[] cacheIds;
 
@@ -116,24 +115,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxAbstractEnlistFut
     }
 
     /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridDhtTxQueryEnlistFuture future = (GridDhtTxQueryEnlistFuture)o;
-
-        return Objects.equals(futId, future.futId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return futId.hashCode();
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtTxQueryEnlistFuture.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
index b3d15d4..c6140fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -34,13 +33,11 @@ import org.apache.ignite.lang.IgniteUuid;
  * Future processing transaction enlisting and locking of entries
  * produces by complex DML queries with reduce step.
  */
-public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEnlistFuture implements UpdateSourceIterator<Object> {
-    /** */
-    private static final long serialVersionUID = -4933550335145438798L;
-    /** */
+public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture implements UpdateSourceIterator<Object> {
+    /** Enlist operation. */
     private EnlistOperation op;
 
-    /** */
+    /** Source iterator. */
     private Iterator<Object> it;
 
     /**
@@ -91,29 +88,6 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEn
     }
 
     /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridDhtTxQueryResultsEnlistFuture future = (GridDhtTxQueryResultsEnlistFuture)o;
-
-        return Objects.equals(futId, future.futId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return futId.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxQueryResultsEnlistFuture.class, this);
-    }
-
-    /** {@inheritDoc} */
     @Override public EnlistOperation operation() {
         return op;
     }
@@ -127,4 +101,9 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEn
     @Override public Object nextX() {
         return it.next();
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxQueryResultsEnlistFuture.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 6662a1c..9883f6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -440,6 +440,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
                                         ctx.localNodeId(),
                                         topologyVersion(),
                                         snapshot,
+                                        false,
+                                        null,
                                         false);
 
                                     break;
@@ -456,6 +458,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
                                         snapshot,
                                         op.cacheOperation(),
                                         false,
+                                        false,
+                                        null,
                                         false);
 
                                     break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index f5689f9..5d3bef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -389,7 +389,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 boolean skipEntry = readNoEntry;
 
                 if (readNoEntry) {
-                    CacheDataRow row = mvccSnapshot != null ? cctx.offheap().mvccRead(cctx, key, mvccSnapshot) :
+                    CacheDataRow row = mvccSnapshot != null ?
+                        cctx.offheap().mvccRead(cctx, key, mvccSnapshot) :
                         cctx.offheap().read(cctx, key);
 
                     if (row != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
new file mode 100644
index 0000000..0bc00e1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Response factory.
+ */
+public final class NearTxResultHandler implements CI1<IgniteInternalFuture<GridCacheReturn>> {
+    /** */
+    private static final long serialVersionUID = 0;
+
+    /** Singleton instance.*/
+    private static final NearTxResultHandler INSTANCE = new NearTxResultHandler();
+
+    /** Constructor. */
+    private NearTxResultHandler() {
+    }
+
+    /**
+     * @return Handler instance.
+     */
+    public static NearTxResultHandler instance() {
+        return INSTANCE;
+    }
+
+    /**
+     * Response factory method.
+     *
+     * @param future Enlist future.
+     * @return Enlist response.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createResponse(IgniteInternalFuture<?> future) {
+        assert future != null;
+
+        Class<?> clazz = future.getClass();
+
+        if (clazz == GridDhtTxEnlistFuture.class)
+            return (T)createResponse((GridDhtTxEnlistFuture)future);
+        else
+            throw new IllegalStateException();
+    }
+
+    /**
+     * Response factory method.
+     *
+     * @param fut Enlist future.
+     * @return Enlist response.
+     */
+    public static GridNearTxEnlistResponse createResponse(GridDhtTxEnlistFuture fut) {
+        try {
+            GridCacheReturn res = fut.get();
+
+            GridCacheVersion ver = null;
+            IgniteUuid id = null;
+
+            if (fut.hasNearNodeUpdates) {
+                ver = fut.cctx.tm().mappedVersion(fut.nearLockVer);
+
+                id = fut.futId;
+            }
+
+            return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId,
+                fut.nearLockVer, res, ver, id, fut.newDhtNodes);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId, fut.nearLockVer, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<GridCacheReturn> fut0) {
+        GridDhtTxAbstractEnlistFuture fut = (GridDhtTxAbstractEnlistFuture)fut0;
+
+        GridCacheContext<?, ?> cctx = fut.cctx;
+        GridDhtTxLocal tx = (GridDhtTxLocal)fut.tx;
+        UUID nearNodeId = fut.nearNodeId;
+
+        GridNearTxEnlistResponse res = createResponse(fut);
+
+        try {
+            cctx.io().send(nearNodeId, res, cctx.ioPolicy());
+        }
+        catch (IgniteCheckedException e) {
+            U.error(fut.log, "Failed to send near enlist response (will rollback transaction) [" +
+                "tx=" + CU.txString(tx) +
+                ", node=" + nearNodeId +
+                ", res=" + res + ']', e);
+
+            try {
+                tx.rollbackDhtLocalAsync();
+            }
+            catch (Throwable e1) {
+                e.addSuppressed(e1);
+            }
+
+            throw new GridClosureException(e);
+        }
+    }
+}