You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/03/23 07:15:40 UTC

[05/51] [abbrv] ignite git commit: Internal cache API cleanup.

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8ed749c..81606d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -18,24 +18,37 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.io.Externalizable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.cache.Cache;
+import javax.cache.CacheException;
 import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -46,31 +59,52 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -83,7 +117,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
  * Replicated user transaction.
  */
 @SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoCloseable  {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -135,6 +169,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /** */
     private boolean hasRemoteLocks;
 
+    /** If this transaction contains transform entries. */
+    protected boolean transform;
+
+    /** */
+    @GridToStringExclude
+    private TransactionProxyImpl proxy;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -244,14 +285,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /**
      * Marks transaction to check if commit on backup.
      */
-    public void markForBackupCheck() {
+    void markForBackupCheck() {
         needCheckBackup = true;
     }
 
     /**
      * @return If need to check tx commit on backup.
      */
-    public boolean onNeedCheckBackup() {
+    boolean onNeedCheckBackup() {
         Boolean check = needCheckBackup;
 
         if (check != null && check) {
@@ -260,52 +301,2127 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             return true;
         }
 
-        return false;
-    }
+        return false;
+    }
+
+    /**
+     * @return If backup check was requested.
+     */
+    boolean needCheckBackup() {
+        return needCheckBackup != null;
+    }
+
+    /**
+     * @return {@code True} if transaction contains at least one near cache key mapped to the local node.
+     */
+    public boolean nearLocallyMapped() {
+        return nearLocallyMapped;
+    }
+
+    /**
+     * @param nearLocallyMapped {@code True} if transaction contains near key mapped to the local node.
+     */
+    void nearLocallyMapped(boolean nearLocallyMapped) {
+        this.nearLocallyMapped = nearLocallyMapped;
+    }
+
+    /**
+     * @return {@code True} if transaction contains colocated key mapped to the local node.
+     */
+    public boolean colocatedLocallyMapped() {
+        return colocatedLocallyMapped;
+    }
+
+    /**
+     * @param colocatedLocallyMapped {@code True} if transaction contains colocated key mapped to the local node.
+     */
+    public void colocatedLocallyMapped(boolean colocatedLocallyMapped) {
+        this.colocatedLocallyMapped = colocatedLocallyMapped;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) {
+        return entry.detached() || super.ownsLockUnsafe(entry);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException {
+        return entry.detached() || super.ownsLock(entry);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param map Map to put.
+     * @param retval Flag indicating whether a value should be returned.
+     * @return Future for put operation.
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
+        GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        Map<? extends K, ? extends V> map,
+        boolean retval
+    ) {
+        return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+            entryTopVer,
+            map,
+            null,
+            null,
+            null,
+            retval);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param key Key.
+     * @param val Value.
+     * @param retval Return value flag.
+     * @param filter Filter.
+     * @return Future for put operation.
+     */
+    public final <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+        GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        K key,
+        V val,
+        boolean retval,
+        CacheEntryPredicate filter) {
+        return putAsync0(cacheCtx,
+            entryTopVer,
+            key,
+            val,
+            null,
+            null,
+            retval,
+            filter);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @return Operation future.
+     */
+    public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        K key,
+        EntryProcessor<K, V, Object> entryProcessor,
+        Object... invokeArgs) {
+        return (IgniteInternalFuture)putAsync0(cacheCtx,
+            entryTopVer,
+            key,
+            null,
+            entryProcessor,
+            invokeArgs,
+            true,
+            null);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param map Entry processors map.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @return Operation future.
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+        GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
+        Object... invokeArgs
+    ) {
+        return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+            entryTopVer,
+            null,
+            map,
+            invokeArgs,
+            null,
+            true);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param drMap DR map to put.
+     * @return Future for DR put operation.
+     */
+    public IgniteInternalFuture<?> putAllDrAsync(
+        GridCacheContext cacheCtx,
+        Map<KeyCacheObject, GridCacheDrInfo> drMap
+    ) {
+        Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+            @Override public Object apply(GridCacheDrInfo val) {
+                return val.value();
+            }
+        });
+
+        return this.<Object, Object>putAllAsync0(cacheCtx,
+            null,
+            map,
+            null,
+            null,
+            drMap,
+            false);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param drMap DR map.
+     * @return Future for asynchronous remove.
+     */
+    public IgniteInternalFuture<?> removeAllDrAsync(
+        GridCacheContext cacheCtx,
+        Map<KeyCacheObject, GridCacheVersion> drMap
+    ) {
+        return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys to remove.
+     * @param retval Flag indicating whether a value should be returned.
+     * @param filter Filter.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+     * @return Future for asynchronous remove.
+     */
+    public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
+        GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        Collection<? extends K> keys,
+        boolean retval,
+        CacheEntryPredicate filter,
+        boolean singleRmv
+    ) {
+        return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv);
+    }
+
+    /**
+     * Internal method for single update operation.
+     *
+     * @param cacheCtx Cache context.
+     * @param key Key.
+     * @param val Value.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param retval Return value flag.
+     * @param filter Filter.
+     * @return Operation future.
+     */
+    private <K, V> IgniteInternalFuture putAsync0(
+        final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        K key,
+        @Nullable V val,
+        @Nullable EntryProcessor<K, V, Object> entryProcessor,
+        @Nullable final Object[] invokeArgs,
+        final boolean retval,
+        @Nullable final CacheEntryPredicate filter
+    ) {
+        assert key != null;
+
+        try {
+            beforePut(cacheCtx, retval);
+
+            final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+            CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+            final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
+            KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+            boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+            final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+            final IgniteInternalFuture<Void> loadFut = enlistWrite(
+                cacheCtx,
+                entryTopVer,
+                cacheKey,
+                val,
+                opCtx != null ? opCtx.expiry() : null,
+                entryProcessor,
+                invokeArgs,
+                retval,
+                /*lockOnly*/false,
+                filters,
+                ret,
+                opCtx != null && opCtx.skipStore(),
+                /*singleRmv*/false,
+                keepBinary,
+                dataCenterId);
+
+            if (pessimistic()) {
+                assert loadFut == null || loadFut.isDone() : loadFut;
+
+                if (loadFut != null)
+                    loadFut.get();
+
+                final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey);
+
+                if (log.isDebugEnabled())
+                    log.debug("Before acquiring transaction lock for put on key: " + enlisted);
+
+                long timeout = remainingTime();
+
+                if (timeout == -1)
+                    return new GridFinishedFuture<>(timeoutException());
+
+                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                    timeout,
+                    this,
+                    /*read*/entryProcessor != null, // Needed to force load from store.
+                    retval,
+                    isolation,
+                    isInvalidate(),
+                    -1L,
+                    -1L);
+
+                PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                    @Override public GridCacheReturn postLock(GridCacheReturn ret)
+                        throws IgniteCheckedException
+                    {
+                        if (log.isDebugEnabled())
+                            log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+                        postLockWrite(cacheCtx,
+                            enlisted,
+                            ret,
+                            /*remove*/false,
+                            retval,
+                            /*read*/false,
+                            -1L,
+                            filters,
+                            /*computeInvoke*/true);
+
+                        return ret;
+                    }
+                };
+
+                if (fut.isDone()) {
+                    try {
+                        return nonInterruptable(plc1.apply(fut.get(), null));
+                    }
+                    catch (GridClosureException e) {
+                        return new GridFinishedFuture<>(e.unwrap());
+                    }
+                    catch (IgniteCheckedException e) {
+                        try {
+                            return nonInterruptable(plc1.apply(false, e));
+                        }
+                        catch (Exception e1) {
+                            return new GridFinishedFuture<>(e1);
+                        }
+                    }
+                }
+                else {
+                    return nonInterruptable(new GridEmbeddedFuture<>(
+                        fut,
+                        plc1
+                    ));
+                }
+            }
+            else
+                return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+        catch (RuntimeException e) {
+            onException();
+
+            throw e;
+        }
+    }
+
+    /**
+     * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
+     * maps must be non-null.
+     *
+     * @param cacheCtx Context.
+     * @param map Key-value map to store.
+     * @param invokeMap Invoke map.
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param drMap DR map.
+     * @param retval Key-transform value map to store.
+     * @return Operation future.
+     */
+    @SuppressWarnings("unchecked")
+    private <K, V> IgniteInternalFuture putAllAsync0(
+        final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        @Nullable Map<? extends K, ? extends V> map,
+        @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
+        @Nullable final Object[] invokeArgs,
+        @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
+        final boolean retval
+    ) {
+        try {
+            beforePut(cacheCtx, retval);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
+        final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+        final Byte dataCenterId;
+
+        if (opCtx != null && opCtx.hasDataCenterId()) {
+            assert drMap == null : drMap;
+            assert map != null || invokeMap != null;
+
+            dataCenterId = opCtx.dataCenterId();
+        }
+        else
+            dataCenterId = null;
+
+        // Cached entry may be passed only from entry wrapper.
+        final Map<?, ?> map0 = map;
+        final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+
+        if (log.isDebugEnabled())
+            log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+
+        assert map0 != null || invokeMap0 != null;
+
+        final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+        if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+            if (implicit())
+                try {
+                    commit();
+                }
+                catch (IgniteCheckedException e) {
+                    return new GridFinishedFuture<>(e);
+                }
+
+            return new GridFinishedFuture<>(ret.success(true));
+        }
+
+        try {
+            Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
+
+            final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
+
+            final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+            final IgniteInternalFuture<Void> loadFut = enlistWrite(
+                cacheCtx,
+                entryTopVer,
+                keySet,
+                opCtx != null ? opCtx.expiry() : null,
+                map0,
+                invokeMap0,
+                invokeArgs,
+                retval,
+                false,
+                CU.filterArray(null),
+                ret,
+                enlisted,
+                drMap,
+                null,
+                opCtx != null && opCtx.skipStore(),
+                false,
+                keepBinary,
+                dataCenterId);
+
+            if (pessimistic()) {
+                assert loadFut == null || loadFut.isDone() : loadFut;
+
+                if (loadFut != null) {
+                    try {
+                        loadFut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        return new GridFinishedFuture(e);
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
+
+                long timeout = remainingTime();
+
+                if (timeout == -1)
+                    return new GridFinishedFuture<>(timeoutException());
+
+                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                    timeout,
+                    this,
+                    /*read*/invokeMap != null, // Needed to force load from store.
+                    retval,
+                    isolation,
+                    isInvalidate(),
+                    -1L,
+                    -1L);
+
+                PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                    @Override public GridCacheReturn postLock(GridCacheReturn ret)
+                        throws IgniteCheckedException
+                    {
+                        if (log.isDebugEnabled())
+                            log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+                        postLockWrite(cacheCtx,
+                            enlisted,
+                            ret,
+                            /*remove*/false,
+                            retval,
+                            /*read*/false,
+                            -1L,
+                            CU.filterArray(null),
+                            /*computeInvoke*/true);
+
+                        return ret;
+                    }
+                };
+
+                if (fut.isDone()) {
+                    try {
+                        return nonInterruptable(plc1.apply(fut.get(), null));
+                    }
+                    catch (GridClosureException e) {
+                        return new GridFinishedFuture<>(e.unwrap());
+                    }
+                    catch (IgniteCheckedException e) {
+                        try {
+                            return nonInterruptable(plc1.apply(false, e));
+                        }
+                        catch (Exception e1) {
+                            return new GridFinishedFuture<>(e1);
+                        }
+                    }
+                }
+                else {
+                    return nonInterruptable(new GridEmbeddedFuture<>(
+                        fut,
+                        plc1
+                    ));
+                }
+            }
+            else
+                return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+        }
+        catch (RuntimeException e) {
+            onException();
+
+            throw e;
+        }
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param cacheKey Key to enlist.
+     * @param val Value.
+     * @param expiryPlc Explicitly specified expiry policy for entry.
+     * @param entryProcessor Entry processor (for invoke operation).
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param retval Flag indicating whether a value should be returned.
+     * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+     * @param filter User filters.
+     * @param ret Return value.
+     * @param skipStore Skip store flag.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+     * @return Future for entry values loading.
+     */
+    private <K, V> IgniteInternalFuture<Void> enlistWrite(
+        final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        KeyCacheObject cacheKey,
+        Object val,
+        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable EntryProcessor<K, V, Object> entryProcessor,
+        @Nullable Object[] invokeArgs,
+        final boolean retval,
+        boolean lockOnly,
+        final CacheEntryPredicate[] filter,
+        final GridCacheReturn ret,
+        boolean skipStore,
+        final boolean singleRmv,
+        boolean keepBinary,
+        Byte dataCenterId) {
+        try {
+            addActiveCache(cacheCtx);
+
+            final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+            final boolean needVal = singleRmv || retval || hasFilters;
+            final boolean needReadVer = needVal && (serializable() && optimistic());
+
+            if (entryProcessor != null)
+                transform = true;
+
+            GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+
+            boolean loadMissed = enlistWriteEntry(cacheCtx,
+                entryTopVer,
+                cacheKey,
+                val,
+                entryProcessor,
+                invokeArgs,
+                expiryPlc,
+                retval,
+                lockOnly,
+                filter,
+                /*drVer*/drVer,
+                /*drTtl*/-1L,
+                /*drExpireTime*/-1L,
+                ret,
+                /*enlisted*/null,
+                skipStore,
+                singleRmv,
+                hasFilters,
+                needVal,
+                needReadVer,
+                keepBinary);
+
+            if (loadMissed) {
+                AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                if (topVer == null)
+                    topVer = entryTopVer;
+
+                return loadMissing(cacheCtx,
+                    topVer != null ? topVer : topologyVersion(),
+                    Collections.singleton(cacheKey),
+                    filter,
+                    ret,
+                    needReadVer,
+                    singleRmv,
+                    hasFilters,
+                    /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+                    retval,
+                    keepBinary,
+                    expiryPlc);
+            }
+
+            return new GridFinishedFuture<>();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /**
+     * Internal routine for <tt>putAll(..)</tt>
+     *
+     * @param cacheCtx Cache context.
+     * @param keys Keys to enlist.
+     * @param expiryPlc Explicitly specified expiry policy for entry.
+     * @param lookup Value lookup map ({@code null} for remove).
+     * @param invokeMap Map with entry processors for invoke operation.
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param retval Flag indicating whether a value should be returned.
+     * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+     * @param filter User filters.
+     * @param ret Return value.
+     * @param enlisted Collection of keys enlisted into this transaction.
+     * @param drPutMap DR put map (optional).
+     * @param drRmvMap DR remove map (optional).
+     * @param skipStore Skip store flag.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+     * @param keepBinary Keep binary flag.
+     * @param dataCenterId Optional data center ID.
+     * @return Future for missing values loading.
+     */
+    private <K, V> IgniteInternalFuture<Void> enlistWrite(
+        final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        Collection<?> keys,
+        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable Map<?, ?> lookup,
+        @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
+        @Nullable Object[] invokeArgs,
+        final boolean retval,
+        boolean lockOnly,
+        final CacheEntryPredicate[] filter,
+        final GridCacheReturn ret,
+        Collection<KeyCacheObject> enlisted,
+        @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
+        boolean skipStore,
+        final boolean singleRmv,
+        final boolean keepBinary,
+        Byte dataCenterId
+    ) {
+        assert retval || invokeMap == null;
+
+        try {
+            addActiveCache(cacheCtx);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+
+        boolean rmv = lookup == null && invokeMap == null;
+
+        final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+        final boolean needVal = singleRmv || retval || hasFilters;
+        final boolean needReadVer = needVal && (serializable() && optimistic());
+
+        try {
+            // Set transform flag for transaction.
+            if (invokeMap != null)
+                transform = true;
+
+            Set<KeyCacheObject> missedForLoad = null;
+
+            for (Object key : keys) {
+                if (key == null) {
+                    rollback();
+
+                    throw new NullPointerException("Null key.");
+                }
+
+                Object val = rmv || lookup == null ? null : lookup.get(key);
+                EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
+
+                GridCacheVersion drVer;
+                long drTtl;
+                long drExpireTime;
+
+                if (drPutMap != null) {
+                    GridCacheDrInfo info = drPutMap.get(key);
+
+                    assert info != null;
+
+                    drVer = info.version();
+                    drTtl = info.ttl();
+                    drExpireTime = info.expireTime();
+                }
+                else if (drRmvMap != null) {
+                    assert drRmvMap.get(key) != null;
+
+                    drVer = drRmvMap.get(key);
+                    drTtl = -1L;
+                    drExpireTime = -1L;
+                }
+                else if (dataCenterId != null) {
+                    drVer = cctx.versions().next(dataCenterId);
+                    drTtl = -1L;
+                    drExpireTime = -1L;
+                }
+                else {
+                    drVer = null;
+                    drTtl = -1L;
+                    drExpireTime = -1L;
+                }
+
+                if (!rmv && val == null && entryProcessor == null) {
+                    setRollbackOnly();
+
+                    throw new NullPointerException("Null value.");
+                }
+
+                KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+                boolean loadMissed = enlistWriteEntry(cacheCtx,
+                    entryTopVer,
+                    cacheKey,
+                    val,
+                    entryProcessor,
+                    invokeArgs,
+                    expiryPlc,
+                    retval,
+                    lockOnly,
+                    filter,
+                    drVer,
+                    drTtl,
+                    drExpireTime,
+                    ret,
+                    enlisted,
+                    skipStore,
+                    singleRmv,
+                    hasFilters,
+                    needVal,
+                    needReadVer,
+                    keepBinary);
+
+                if (loadMissed) {
+                    if (missedForLoad == null)
+                        missedForLoad = new HashSet<>();
+
+                    missedForLoad.add(cacheKey);
+                }
+            }
+
+            if (missedForLoad != null) {
+                AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                if (topVer == null)
+                    topVer = entryTopVer;
+
+                return loadMissing(cacheCtx,
+                    topVer != null ? topVer : topologyVersion(),
+                    missedForLoad,
+                    filter,
+                    ret,
+                    needReadVer,
+                    singleRmv,
+                    hasFilters,
+                    /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+                    retval,
+                    keepBinary,
+                    expiryPlc);
+            }
+
+            return new GridFinishedFuture<>();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param cacheKey Key.
+     * @param val Value.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param expiryPlc Explicitly specified expiry policy for entry.
+     * @param retval Return value flag.
+     * @param lockOnly Lock only flag.
+     * @param filter Filter.
+     * @param drVer DR version.
+     * @param drTtl DR ttl.
+     * @param drExpireTime DR expire time.
+     * @param ret Return value.
+     * @param enlisted Enlisted keys collection.
+     * @param skipStore Skip store flag.
+     * @param singleRmv {@code True} for single remove operation.
+     * @param hasFilters {@code True} if filters not empty.
+     * @param needVal {@code True} if value is needed.
+     * @param needReadVer {@code True} if need read entry version.
+     * @return {@code True} if entry value should be loaded.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        final KeyCacheObject cacheKey,
+        @Nullable final Object val,
+        @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
+        @Nullable final Object[] invokeArgs,
+        @Nullable final ExpiryPolicy expiryPlc,
+        final boolean retval,
+        final boolean lockOnly,
+        final CacheEntryPredicate[] filter,
+        final GridCacheVersion drVer,
+        final long drTtl,
+        long drExpireTime,
+        final GridCacheReturn ret,
+        @Nullable final Collection<KeyCacheObject> enlisted,
+        boolean skipStore,
+        boolean singleRmv,
+        boolean hasFilters,
+        final boolean needVal,
+        boolean needReadVer,
+        boolean keepBinary
+    ) throws IgniteCheckedException {
+        boolean loadMissed = false;
+
+        final boolean rmv = val == null && entryProcessor == null;
+
+        IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+        IgniteTxEntry txEntry = entry(txKey);
+
+        // First time access.
+        if (txEntry == null) {
+            while (true) {
+                GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
+
+                try {
+                    entry.unswap(false);
+
+                    // Check if lock is being explicitly acquired by the same thread.
+                    if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+                        entry.lockedByThread(threadId, xidVer)) {
+                        throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+                            "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+                            ", entry=" + entry +
+                            ", xidVer=" + xidVer +
+                            ", threadId=" + threadId +
+                            ", locNodeId=" + cctx.localNodeId() + ']');
+                    }
+
+                    CacheObject old = null;
+                    GridCacheVersion readVer = null;
+
+                    if (optimistic() && !implicit()) {
+                        try {
+                            if (needReadVer) {
+                                EntryGetResult res = primaryLocal(entry) ?
+                                    entry.innerGetVersioned(
+                                        null,
+                                        this,
+                                        /*swap*/false,
+                                        /*unmarshal*/retval || needVal,
+                                        /*metrics*/retval,
+                                        /*events*/retval,
+                                        CU.subjectId(this, cctx),
+                                        entryProcessor,
+                                        resolveTaskName(),
+                                        null,
+                                        keepBinary,
+                                        null) : null;
+
+                                if (res != null) {
+                                    old = res.value();
+                                    readVer = res.version();
+                                }
+                            }
+                            else {
+                                old = entry.innerGet(
+                                    null,
+                                    this,
+                                    /*swap*/false,
+                                    /*read-through*/false,
+                                    /*metrics*/retval,
+                                    /*events*/retval,
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    entryProcessor,
+                                    resolveTaskName(),
+                                    null,
+                                    keepBinary);
+                            }
+                        }
+                        catch (ClusterTopologyCheckedException e) {
+                            entry.context().evicts().touch(entry, topologyVersion());
+
+                            throw e;
+                        }
+                    }
+                    else
+                        old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+
+                    final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+                        entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+                    if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+                        ret.set(cacheCtx, old, false, keepBinary);
+
+                        if (!readCommitted()) {
+                            if (optimistic() && serializable()) {
+                                txEntry = addEntry(op,
+                                    old,
+                                    entryProcessor,
+                                    invokeArgs,
+                                    entry,
+                                    expiryPlc,
+                                    filter,
+                                    true,
+                                    drTtl,
+                                    drExpireTime,
+                                    drVer,
+                                    skipStore,
+                                    keepBinary);
+                            }
+                            else {
+                                txEntry = addEntry(READ,
+                                    old,
+                                    null,
+                                    null,
+                                    entry,
+                                    null,
+                                    CU.empty0(),
+                                    false,
+                                    -1L,
+                                    -1L,
+                                    null,
+                                    skipStore,
+                                    keepBinary);
+                            }
+
+                            txEntry.markValid();
+
+                            if (needReadVer) {
+                                assert readVer != null;
+
+                                txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                            }
+                        }
+
+                        if (readCommitted())
+                            cacheCtx.evicts().touch(entry, topologyVersion());
+
+                        break; // While.
+                    }
+
+                    txEntry = addEntry(op,
+                        cacheCtx.toCacheObject(val),
+                        entryProcessor,
+                        invokeArgs,
+                        entry,
+                        expiryPlc,
+                        filter,
+                        true,
+                        drTtl,
+                        drExpireTime,
+                        drVer,
+                        skipStore,
+                        keepBinary);
+
+                    if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
+                        cacheCtx.evicts().touch(entry, topologyVersion());
+
+                    if (enlisted != null)
+                        enlisted.add(cacheKey);
+
+                    if (!pessimistic() && !implicit()) {
+                        txEntry.markValid();
+
+                        if (old == null) {
+                            if (needVal)
+                                loadMissed = true;
+                            else {
+                                assert !implicit() || !transform : this;
+                                assert txEntry.op() != TRANSFORM : txEntry;
+
+                                if (retval)
+                                    ret.set(cacheCtx, null, true, keepBinary);
+                                else
+                                    ret.success(true);
+                            }
+                        }
+                        else {
+                            if (needReadVer) {
+                                assert readVer != null;
+
+                                txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                            }
+
+                            if (retval && !transform)
+                                ret.set(cacheCtx, old, true, keepBinary);
+                            else {
+                                if (txEntry.op() == TRANSFORM) {
+                                    GridCacheVersion ver;
+
+                                    try {
+                                        ver = entry.version();
+                                    }
+                                    catch (GridCacheEntryRemovedException ex) {
+                                        assert optimistic() : txEntry;
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to get entry version " +
+                                                "[err=" + ex.getMessage() + ']');
+
+                                        ver = null;
+                                    }
+
+                                    addInvokeResult(txEntry, old, ret, ver);
+                                }
+                                else
+                                    ret.success(true);
+                            }
+                        }
+                    }
+                    // Pessimistic.
+                    else {
+                        if (retval && !transform)
+                            ret.set(cacheCtx, old, true, keepBinary);
+                        else
+                            ret.success(true);
+                    }
+
+                    break; // While.
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in transaction putAll0 method: " + entry);
+                }
+            }
+        }
+        else {
+            if (entryProcessor == null && txEntry.op() == TRANSFORM)
+                throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+                    "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
+
+            GridCacheEntryEx entry = txEntry.cached();
+
+            CacheObject v = txEntry.value();
+
+            boolean del = txEntry.op() == DELETE && rmv;
+
+            if (!del) {
+                if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
+                    ret.set(cacheCtx, v, false, keepBinary);
+
+                    return loadMissed;
+                }
+
+                GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+                    v != null ? UPDATE : CREATE;
+
+                txEntry = addEntry(op,
+                    cacheCtx.toCacheObject(val),
+                    entryProcessor,
+                    invokeArgs,
+                    entry,
+                    expiryPlc,
+                    filter,
+                    true,
+                    drTtl,
+                    drExpireTime,
+                    drVer,
+                    skipStore,
+                    keepBinary);
+
+                if (enlisted != null)
+                    enlisted.add(cacheKey);
+
+                if (txEntry.op() == TRANSFORM) {
+                    GridCacheVersion ver;
+
+                    try {
+                        ver = entry.version();
+                    }
+                    catch (GridCacheEntryRemovedException e) {
+                        assert optimistic() : txEntry;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+
+                        ver = null;
+                    }
+
+                    addInvokeResult(txEntry, txEntry.value(), ret, ver);
+                }
+            }
+
+            if (!pessimistic()) {
+                txEntry.markValid();
+
+                if (retval && !transform)
+                    ret.set(cacheCtx, v, true, keepBinary);
+                else
+                    ret.success(true);
+            }
+        }
+
+        return loadMissed;
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys to remove.
+     * @param drMap DR map.
+     * @param retval Flag indicating whether a value should be returned.
+     * @param filter Filter.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+     * @return Future for asynchronous remove.
+     */
+    @SuppressWarnings("unchecked")
+    private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0(
+        final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        @Nullable final Collection<? extends K> keys,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
+        final boolean retval,
+        @Nullable final CacheEntryPredicate filter,
+        boolean singleRmv) {
+        try {
+            checkUpdatesAllowed(cacheCtx);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+
+        cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+
+        if (retval)
+            needReturnValue(true);
+
+        final Collection<?> keys0;
+
+        if (drMap != null) {
+            assert keys == null;
+
+            keys0 = drMap.keySet();
+        }
+        else
+            keys0 = keys;
+
+        CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+        final Byte dataCenterId;
+
+        if (opCtx != null && opCtx.hasDataCenterId()) {
+            assert drMap == null : drMap;
+
+            dataCenterId = opCtx.dataCenterId();
+        }
+        else
+            dataCenterId = null;
+
+        assert keys0 != null;
+
+        if (log.isDebugEnabled())
+            log.debug(S.toString("Called removeAllAsync(...)",
+                "tx", this, false,
+                "keys", keys0, true,
+                "implicit", implicit, false,
+                "retval", retval, false));
+
+        try {
+            checkValid();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+
+        final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+        if (F.isEmpty(keys0)) {
+            if (implicit()) {
+                try {
+                    commit();
+                }
+                catch (IgniteCheckedException e) {
+                    return new GridFinishedFuture<>(e);
+                }
+            }
+
+            return new GridFinishedFuture<>(ret.success(true));
+        }
+
+        init();
+
+        final Collection<KeyCacheObject> enlisted = new ArrayList<>();
+
+        ExpiryPolicy plc;
+
+        final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+        if (!F.isEmpty(filters))
+            plc = opCtx != null ? opCtx.expiry() : null;
+        else
+            plc = null;
+
+        final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+        final IgniteInternalFuture<Void> loadFut = enlistWrite(
+            cacheCtx,
+            entryTopVer,
+            keys0,
+            plc,
+            /*lookup map*/null,
+            /*invoke map*/null,
+            /*invoke arguments*/null,
+            retval,
+            /*lock only*/false,
+            filters,
+            ret,
+            enlisted,
+            null,
+            drMap,
+            opCtx != null && opCtx.skipStore(),
+            singleRmv,
+            keepBinary,
+            dataCenterId
+        );
+
+        if (log.isDebugEnabled())
+            log.debug("Remove keys: " + enlisted);
+
+        // Acquire locks only after having added operation to the write set.
+        // Otherwise, during rollback we will not know whether locks need
+        // to be rolled back.
+        if (pessimistic()) {
+            assert loadFut == null || loadFut.isDone() : loadFut;
+
+            if (loadFut != null) {
+                try {
+                    loadFut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    return new GridFinishedFuture<>(e);
+                }
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
+
+            long timeout = remainingTime();
+
+            if (timeout == -1)
+                return new GridFinishedFuture<>(timeoutException());
+
+            IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                timeout,
+                this,
+                false,
+                retval,
+                isolation,
+                isInvalidate(),
+                -1L,
+                -1L);
+
+            PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+                    throws IgniteCheckedException
+                {
+                    if (log.isDebugEnabled())
+                        log.debug("Acquired transaction lock for remove on keys: " + enlisted);
+
+                    postLockWrite(cacheCtx,
+                        enlisted,
+                        ret,
+                        /*remove*/true,
+                        retval,
+                        /*read*/false,
+                        -1L,
+                        filters,
+                        /*computeInvoke*/false);
+
+                    return ret;
+                }
+            };
+
+            if (fut.isDone()) {
+                try {
+                    return nonInterruptable(plc1.apply(fut.get(), null));
+                }
+                catch (GridClosureException e) {
+                    return new GridFinishedFuture<>(e.unwrap());
+                }
+                catch (IgniteCheckedException e) {
+                    try {
+                        return nonInterruptable(plc1.apply(false, e));
+                    }
+                    catch (Exception e1) {
+                        return new GridFinishedFuture<>(e1);
+                    }
+                }
+            }
+            else
+                return nonInterruptable(new GridEmbeddedFuture<>(
+                    fut,
+                    plc1
+                ));
+        }
+        else {
+            if (implicit()) {
+                // Should never load missing values for implicit transaction as values will be returned
+                // with prepare response, if required.
+                assert loadFut.isDone();
+
+                return nonInterruptable(commitNearTxLocalAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                    @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+                        throws IgniteCheckedException {
+                        try {
+                            txFut.get();
+
+                            return new GridCacheReturn(cacheCtx, true, keepBinary,
+                                implicitRes.value(), implicitRes.success());
+                        }
+                        catch (IgniteCheckedException | RuntimeException e) {
+                            rollbackNearTxLocalAsync();
+
+                            throw e;
+                        }
+                    }
+                }));
+            }
+            else {
+                return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+                    @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+                        throws IgniteCheckedException {
+                        f.get();
+
+                        return ret;
+                    }
+                }));
+            }
+        }
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys to get.
+     * @param deserializeBinary Deserialize binary flag.
+     * @param skipVals Skip values flag.
+     * @param keepCacheObjects Keep cache objects
+     * @param skipStore Skip store flag.
+     * @return Future for this get.
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
+        final GridCacheContext cacheCtx,
+        @Nullable final AffinityTopologyVersion entryTopVer,
+        Collection<KeyCacheObject> keys,
+        final boolean deserializeBinary,
+        final boolean skipVals,
+        final boolean keepCacheObjects,
+        final boolean skipStore,
+        final boolean needVer) {
+        if (F.isEmpty(keys))
+            return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+
+        init();
+
+        int keysCnt = keys.size();
+
+        boolean single = keysCnt == 1;
+
+        try {
+            checkValid();
+
+            final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+
+            final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+
+            CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+            ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+
+            final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
+                entryTopVer,
+                keys,
+                expiryPlc,
+                retMap,
+                missed,
+                keysCnt,
+                deserializeBinary,
+                skipVals,
+                keepCacheObjects,
+                skipStore,
+                needVer);
+
+            if (single && missed.isEmpty())
+                return new GridFinishedFuture<>(retMap);
+
+            // Handle locks.
+            if (pessimistic() && !readCommitted() && !skipVals) {
+                if (expiryPlc == null)
+                    expiryPlc = cacheCtx.expiry();
+
+                long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
+                long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
+
+                long timeout = remainingTime();
+
+                if (timeout == -1)
+                    return new GridFinishedFuture<>(timeoutException());
+
+                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
+                    timeout,
+                    this,
+                    true,
+                    true,
+                    isolation,
+                    isInvalidate(),
+                    createTtl,
+                    accessTtl);
+
+                final ExpiryPolicy expiryPlc0 = expiryPlc;
+
+                PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+                    @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+                        if (log.isDebugEnabled())
+                            log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+
+                        // Load keys only after the locks have been acquired.
+                        for (KeyCacheObject cacheKey : lockKeys) {
+                            K keyVal = (K)
+                                (keepCacheObjects ? cacheKey :
+                                    cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
+
+                            if (retMap.containsKey(keyVal))
+                                // We already have a return value.
+                                continue;
+
+                            IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+                            IgniteTxEntry txEntry = entry(txKey);
+
+                            assert txEntry != null;
+
+                            // Check if there is cached value.
+                            while (true) {
+                                GridCacheEntryEx cached = txEntry.cached();
+
+                                CacheObject val = null;
+                                GridCacheVersion readVer = null;
+                                EntryGetResult getRes = null;
+
+                                try {
+                                    Object transformClo =
+                                        (!F.isEmpty(txEntry.entryProcessors()) &&
+                                            cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                            F.first(txEntry.entryProcessors()) : null;
+
+                                    if (needVer) {
+                                        getRes = cached.innerGetVersioned(
+                                            null,
+                                            GridNearTxLocal.this,
+                                            /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
+                                            /*unmarshal*/true,
+                                            /*update-metrics*/true,
+                                            /*event*/!skipVals,
+                                            CU.subjectId(GridNearTxLocal.this, cctx),
+                                            transformClo,
+                                            resolveTaskName(),
+                                            null,
+                                            txEntry.keepBinary(),
+                                            null);
+
+                                        if (getRes != null) {
+                                            val = getRes.value();
+                                            readVer = getRes.version();
+                                        }
+                                    }
+                                    else{
+                                        val = cached.innerGet(
+                                            null,
+                                            GridNearTxLocal.this,
+                                            cacheCtx.isSwapOrOffheapEnabled(),
+                                            /*read-through*/false,
+                                            /*metrics*/true,
+                                            /*events*/!skipVals,
+                                            /*temporary*/false,
+                                            CU.subjectId(GridNearTxLocal.this, cctx),
+                                            transformClo,
+                                            resolveTaskName(),
+                                            null,
+                                            txEntry.keepBinary());
+                                    }
+
+                                    // If value is in cache and passed the filter.
+                                    if (val != null) {
+                                        missed.remove(cacheKey);
+
+                                        txEntry.setAndMarkValid(val);
+
+                                        if (!F.isEmpty(txEntry.entryProcessors()))
+                                            val = txEntry.applyEntryProcessors(val);
+
+                                        cacheCtx.addResult(retMap,
+                                            cacheKey,
+                                            val,
+                                            skipVals,
+                                            keepCacheObjects,
+                                            deserializeBinary,
+                                            false,
+                                            getRes,
+                                            readVer,
+                                            0,
+                                            0,
+                                            needVer);
+
+                                        if (readVer != null)
+                                            txEntry.entryReadVersion(readVer);
+                                    }
+
+                                    // Even though we bring the value back from lock acquisition,
+                                    // we still need to recheck primary node for consistent values
+                                    // in case of concurrent transactional locks.
+
+                                    break; // While.
+                                }
+                                catch (GridCacheEntryRemovedException ignore) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got removed exception in get postLock (will retry): " +
+                                            cached);
+
+                                    txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
+                                }
+                            }
+                        }
+
+                        if (!missed.isEmpty() && cacheCtx.isLocal()) {
+                            AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                            if (topVer == null)
+                                topVer = entryTopVer;
+
+                            return checkMissed(cacheCtx,
+                                topVer != null ? topVer : topologyVersion(),
+                                retMap,
+                                missed,
+                                deserializeBinary,
+                                skipVals,
+                                keepCacheObjects,
+                                skipStore,
+                                needVer,
+                                expiryPlc0);
+                        }
+
+                        return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+                    }
+                };
+
+                FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+                    @Override Map<K, V> finish(Map<K, V> loaded) {
+                        retMap.putAll(loaded);
+
+                        return retMap;
+                    }
+                };
+
+                if (fut.isDone()) {
+                    try {
+                        IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+
+                        return fut1.isDone() ?
+                            new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
+                            new GridEmbeddedFuture<>(finClos, fut1);
+                    }
+                    catch (GridClosureException e) {
+                        return new GridFinishedFuture<>(e.unwrap());
+                    }
+                    catch (IgniteCheckedException e) {
+                        try {
+                            return plc2.apply(false, e);
+                        }
+                        catch (Exception e1) {
+                            return new GridFinishedFuture<>(e1);
+                        }
+                    }
+                }
+                else {
+                    return new GridEmbeddedFuture<>(
+                        fut,
+                        plc2,
+                        finClos);
+                }
+            }
+            else {
+                assert optimistic() || readCommitted() || skipVals;
+
+                if (!missed.isEmpty()) {
+                    if (!readCommitted())
+                        for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
+                            KeyCacheObject cacheKey = it.next();
+
+                            K keyVal =
+                                (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
+
+                            if (retMap.containsKey(keyVal))
+                                it.remove();
+                        }
+
+                    if (missed.isEmpty())
+                        return new GridFinishedFuture<>(retMap);
+
+                    AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                    if (topVer == null)
+                        topVer = entryTopVer;
+
+                    return checkMissed(cacheCtx,
+                        topVer != null ? topVer : topologyVersion(),
+                        retMap,
+                        missed,
+                        deserializeBinary,
+                        skipVals,
+                        keepCacheObjects,
+                        skipStore,
+                        needVer,
+                        expiryPlc);
+                }
+
+                return new GridFinishedFuture<>(retMap);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            setRollbackOnly();
+
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param keys Key to enlist.
+     * @param expiryPlc Explicitly specified expiry policy for entry.
+     * @param map Return map.
+     * @param missed Map of missed keys.
+     * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+     * @param deserializeBinary Deserialize binary flag.
+     * @param skipVals Skip values flag.
+     * @param keepCacheObjects Keep cache objects flag.
+     * @param skipStore Skip store flag.
+     * @throws IgniteCheckedException If failed.
+     * @return Enlisted keys.
+     */
+    @SuppressWarnings({"RedundantTypeArguments"})
+    private <K, V> Collection<KeyCacheObject> enlistRead(
+        final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
+        Collection<KeyCacheObject> keys,
+        @Nullable ExpiryPolicy expiryPlc,
+        Map<K, V> map,
+        Map<KeyCacheObject, GridCacheVersion> missed,
+        int keysCnt,
+        boolean deserializeBinary,
+        boolean skipVals,
+        boolean keepCacheObjects,
+        boolean skipStore,
+        final boolean needVer
+    ) throws IgniteCheckedException {
+        assert !F.isEmpty(keys);
+        assert keysCnt == keys.size();
+
+        cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        boolean single = keysCnt == 1;
+
+        Collection<KeyCacheObject> lockKeys = null;
+
+        AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
+
+        boolean needReadVer = (serializable() && optimistic()) || needVer;
+
+        // In this loop we cover only read-committed or optimistic transactions.
+        // Transactions that are pessimistic and not read-committed are covered
+        // outside of this loop.
+        for (KeyCacheObject key : keys) {
+            if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
+                addActiveCache(cacheCtx);
+
+            IgniteTxKey txKey = cacheCtx.txKey(key);
+
+            // Check write map (always check writes first).
+            IgniteTxEntry txEntry = entry(txKey);
+
+            // Either non-read-committed or there was a previous write.
+            if (txEntry != null) {
+                CacheObject val = txEntry.value();
+
+                if (txEntry.hasValue()) {
+                    if (!F.isEmpty(txEntry.entryProcessors()))
+                        val = txEntry.applyEntryProcessors(val);
+
+                    if (val != null) {
+                        GridCacheVersion ver = null;
+
+                        if (needVer) {
+                            if (txEntry.op() != READ)
+                                ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+                            else {
+                                ver = txEntry.entryReadVersion();
+
+                                if (ver == null && pessimistic()) {
+                                    while (true) {
+                                        try {
+                                            GridCacheEntryEx cached = txEntry.cached();
+
+                                            ver = cached.isNear() ?
+                                                ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+
+                                            break;
+                                        }
+                                        catch (GridCacheEntryRemovedException ignored) {
+                                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+                                        }
+                                    }
+                                }
+
+                                if (ver == null) {
+                                    assert optimistic() && repeatableRead() : this;
+
+                                    ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+                                }
+                            }
+
+                            assert ver != null;
+                        }
+
+                        cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
+                            ver, 0, 0);
+                    }
+                }
+                else {
+                    assert txEntry.op() == TRANSFORM;
+
+                    while (true) {
+                        try {
+                            GridCacheVersion readVer = null;
+                            EntryGetResult getRes = null;
+
+                            Object transformClo =
+                                (txEntry.op() == TRANSFORM &&
+                                    cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                    F.first(txEntry.entryProcessors()) : null;
+
+                            if (needVer) {
+                                getRes = txEntry.cached().innerGetVersioned(
+                                    null,
+                                    this,
+                                    /*swap*/true,
+                                    /*unmarshal*/true,
+                                    /*update-metrics*/true,
+                                    /*event*/!skipVals,
+                                    CU.subjectId(this, cctx),
+                                    transformClo,
+                                    resolveTaskName(),
+                                    null,
+                                    txEntry.keepBinary(),
+                                    null);
+
+                                if (getRes != null) {
+                                    val = getRes.value();
+                                    readVer = getRes.version();
+                                }
+                            }
+                            else {
+                                val = txEntry.cached().innerGet(
+                                    null,
+                                    this,
+                                    /*swap*/true,
+                                    /*read-through*/false,
+                                    /*metrics*/true,
+                                    /*event*/!skipVals,
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    transformClo,
+                                    resolveTaskName(),
+                                    null,
+                                    txEntry.keepBinary());
+                            }
+
+                            if (val != null) {
+                                if (!readCommitted() && !skipVals)
+                                    txEntry.readValue(val);
+
+                                if (!F.isEmpty(txEntry.entryProcessors()))
+                                    val = txEntry.applyEntryProcessors(val);
+
+                                cacheCtx.addResult(map,
+                                    key,
+                                    val,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializeBinary,
+                                    false,
+                                    getRes,
+                                    readVer,
+                                    0,
+                                    0,
+                                    needVer);
+                            }
+                            else
+                                missed.put(key, txEntry.cached().version());
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException ignored) {
+                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+                        }
+                    }
+                }
+            }
+            // First time access within transaction.
+            else {
+                if (lockKeys == null && !skipVals)
+                    lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
+
+                if (!single && !skipVals)
+                    lockKeys.add(key);
+
+                while (true) {
+                    GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+
+                    try {
+                        GridCacheVersion ver = entry.version();
+
+                        CacheObject val = null;
+                        GridCacheVersion readVer = null;
+                        EntryGetResult getRes = null;
+
+                        if (!pessimistic() || readCommitted() && !skipVals) {
+                            IgniteCacheExpiryPolicy accessPlc =
+                                optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
+
+                            if (needReadVer) {
+                                getRes = primaryLocal(entry) ?
+                                    entry.innerGetVersioned(
+                                        null,
+                                        this,
+                                        /*swap*/true,
+                                        /*unmarshal*/true,
+                                        /*metrics*/true,
+                                        /*event*/true,
+                                        CU.subjectId(this, cctx),
+                                        null,
+                                        resolveTaskName(),
+                                        accessPlc,
+                                        !deserializeBinary,
+                                        null) : null;
+
+                                if (getRes != null) {
+                                    val = getRes.value();
+                                    readVer = getRes.version();
+                                }
+                            }
+                            else {
+                                val = entry.innerGet(
+                                    null,
+                                    this,
+                                    /*swap*/true,
+                                    /*read-through*/false,
+                                    /*metrics*/true,
+                                    /*event*/true,
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    null,
+                                    resolveTaskName(),
+                                    accessPlc,
+                                    !deserializeBinary);
+                            }
+
+                            if (val != null) {
+                                cacheCtx.addResult(map,
+                                    key,
+                                    val,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializeBinary,
+                                    false,
+                                    getRes,
+                                    readVer,
+                                    0,
+                                    0,
+                                    needVer);
+                            }
+                            else
+                                missed.put(key, ver);
+                        }
+                        else
+                            // We must wait for the lock in pessimistic mode.
+                            missed.put(key, ver);
+
+                        if (!readCommitted() && !skipVals) {
+                            txEntry = addEntry(READ,
+                                val,
+                                null,
+                                null,
+                                entry,
+                                expiryPlc,
+                                null,
+                                true,
+                                -1L,
+                                -1L,
+                                null,
+                                skipStore,
+                                !deserializeBinary);
+
+                            // As optimization, mark as checked immediately
+                            // for non-pessimistic if value is not null.
+                            if (val != null && !pessimistic()) {
+                                txEntry.markValid();
+
+                                if (needReadVer) {
+                                    assert readVer != null;
+
+                                    txEntry.entryReadVersion(readVer);
+                                }
+                            }
+                        }
+
+                        break; // While.
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+                    }
+                    finally {
+                        if (entry != null && readCommitted()) {
+                            if (cacheCtx.isNear()) {
+                                if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
+                                    if (entry.markObsolete(xidVer))
+                                        cacheCtx.cache().removeEntry(entry);
+                                }
+                            }
+                            else
+                                entry.context().evicts().touch(entry, topVer);
+                        }
+                    }
+                }
+            }
+        }
+
+        return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys to load.
+     * @param filter Filter.
+     * @param ret Return value.
+     * @param needReadVer Read version flag.
+     * @param singleRmv {@code True} for single remove operation.
+     * @param hasFilters {@code True} if filters not empty.
+     * @param readThrough Read through flag.
+     * @param retval Return value flag.
+     * @param expiryPlc Expiry policy.
+     * @return Load future.
+     */
+    private IgniteInternalFuture<Void> loadMissing(
+        final GridCacheContext cacheCtx,
+        final AffinityTopologyVersion topVer,
+        final Set<KeyCacheObject> keys,
+        final CacheEntryPredicate[] filter,
+        final GridCacheReturn ret,
+        final boolean needReadVer,
+        final boolean singleRmv,
+        final boolean hasFilters,
+        final boolean readThrough,
+        final boolean retval,
+        final boolean keepBinary,
+        final ExpiryPolicy expiryPlc) {
+        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+            new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+                @Override public void apply(KeyCacheObject key,
+                                            @Nullable Object val,
+                                            @Nullable GridCacheVersion loadVer) {
+                    if (log.isDebugEnabled())
+                        log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+
+                    IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
+
+                    assert e != null;
+
+                    if (needReadVer) {
+                        assert loadVer != null;
+
+                        e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+                    }
+
+                    if (singleRmv) {
+                        assert !hasFilters && !retval;
+                        assert val == null || Boolean.TRUE.equals(val) : val;
+
+                        ret.set(cacheCtx, null, val != null, keepBinary);
+                    }
+                    else {
+                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+                        if (e.op() == TRANSFORM) {
+                            GridCacheVersion ver;
+
+                            e.readValue(cacheVal);
+
+                            try {
+                                ver = e.cached().version();
+                            }
+                            catch (GridCacheEntryRemovedException ex) {
+                                assert optimistic() : e;
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+
+                                ver = null;
+                            }
+
+                            addInvokeResult(e, cacheVal, ret, ver);
+                        }
+                        else {
+                            boolean success;
+
+                            if (hasFilters) {
+                                success = isAll(e.context(), key, cacheVal, filter);
+
+                                if (!success)
+                                    e.value(cacheVal, false, false);
+                            }
+                            else
+                                success = true;
+
+                            ret.set(cacheCtx, cacheVal, success, keepBinary);
+                        }
+                    }
+                }
+            };
+
+        return loadMissing(
+            cacheCtx,
+            topVer,
+            readThrough,
+            /*async*/true,
+            keys,
+            /*skipVals*/singleRmv,
+            needReadVer,
+            keepBinary,
+            expiryPlc,
+            c);
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param loadFut Missing keys load future.
+     * @param ret Future result.
+     * @param keepBinary Keep binary flag.
+     * @return Future.
+     */
+    private IgniteInternalFuture optimisticPutFuture(
+        final GridCacheContext cacheCtx,
+        IgniteInternalFuture<Void> loadFut,
+        final GridCacheReturn ret,
+        final boolean keepBinary
+    ) {
+        if (implicit()) {
+            // Should never load missing values for implicit transaction as values will be returned
+            // with prepare response, if required.
+            assert loadFut.isDone();
+
+            try {
+                loadFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
+            }
+
+            return nonInterruptable(commitNearTxLocalAsync().chain(
+                new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                    @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+                        throws IgniteCheckedException {
+                        try {
+                            txFut.get();
+
+                            Object res = implicitRes.value();
+
+                            if (implicitRes.invokeResult()) {
+                                assert res == null || res instanceof Map : implicitRes;
 
-    /**
-     * @return If backup check was requested.
-     */
-    public boolean needCheckBackup() {
-        return needCheckBackup != null;
-    }
+                                res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary);
+                            }
 
-    /**
-     * @return {@code True} if transaction contains at least one near cache key mapped to the local node.
-     */
-    public boolean nearLocallyMapped() {
-        return nearLocallyMapped;
-    }
+                            return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
+                        }
+                        catch (IgniteCheckedException | RuntimeException e) {
+                            if (!(e instanceof NodeStoppingException))
+                                rollbackNearTxLocalAsync();
 
-    /**
-     * @param nearLocallyMapped {@code True} if transaction contains near key mapped to the local

<TRUNCATED>