You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2017/03/25 00:12:43 UTC

[03/56] [abbrv] ignite git commit: Internal cache API cleanup.

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b1a4003..dc4e52f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -18,33 +18,24 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.io.Externalizable;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import javax.cache.Cache;
-import javax.cache.CacheException;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
 import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -58,9 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -68,36 +56,26 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.C2;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
@@ -105,8 +83,6 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
-import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
-import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -164,6 +140,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @GridToStringInclude
     protected IgniteTxLocalState txState;
 
+    /** */
+    protected CacheWriteSynchronizationMode syncMode;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -224,6 +203,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl();
     }
 
+    /**
+     * @return Transaction write synchronization mode.
+     */
+    public final CacheWriteSynchronizationMode syncMode() {
+        if (syncMode != null)
+            return syncMode;
+
+        return txState().syncMode(cctx);
+    }
+
+    /**
+     * @param syncMode Write synchronization mode.
+     */
+    public void syncMode(CacheWriteSynchronizationMode syncMode) {
+        this.syncMode = syncMode;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteTxState txState() {
         return txState;
@@ -391,142 +387,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         return null;
     }
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> loadMissing(
-        final GridCacheContext cacheCtx,
-        final AffinityTopologyVersion topVer,
-        final boolean readThrough,
-        boolean async,
-        final Collection<KeyCacheObject> keys,
-        boolean skipVals,
-        boolean needVer,
-        boolean keepBinary,
-        final ExpiryPolicy expiryPlc,
-        final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
-    ) {
-        assert cacheCtx.isLocal() : cacheCtx.name();
-
-        if (!readThrough || !cacheCtx.readThrough()) {
-            for (KeyCacheObject key : keys)
-                c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
-
-            return new GridFinishedFuture<>();
-        }
-
-        try {
-            IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
-                accessPolicy(cacheCtx, keys) :
-                cacheCtx.cache().expiryPolicy(expiryPlc);
-
-            Map<KeyCacheObject, GridCacheVersion> misses = null;
-
-            for (KeyCacheObject key : keys) {
-                while (true) {
-                    IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
-                    GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) :
-                        txEntry.cached();
-
-                    if (entry == null)
-                        continue;
-
-                    try {
-                        EntryGetResult res = entry.innerGetVersioned(
-                            null,
-                            this,
-                            /*readSwap*/true,
-                            /*unmarshal*/true,
-                            /*update-metrics*/!skipVals,
-                            /*event*/!skipVals,
-                            CU.subjectId(this, cctx),
-                            null,
-                            resolveTaskName(),
-                            expiryPlc0,
-                            txEntry == null ? keepBinary : txEntry.keepBinary(),
-                            null);
-
-                        if (res == null) {
-                            if (misses == null)
-                                misses = new LinkedHashMap<>();
-
-                            misses.put(key, entry.version());
-                        }
-                        else
-                            c.apply(key, skipVals ? true : res.value(), res.version());
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry, will retry: " + key);
-
-                        if (txEntry != null)
-                            txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion()));
-                    }
-                }
-            }
-
-            if (misses != null) {
-                final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
-
-                cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
-                    @Override public void apply(KeyCacheObject key, Object val) {
-                        GridCacheVersion ver = misses0.remove(key);
-
-                        assert ver != null : key;
-
-                        if (val != null) {
-                            CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
-                            while (true) {
-                                GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
-
-                                try {
-                                    EntryGetResult verVal = entry.versionedValue(cacheVal,
-                                        ver,
-                                        null,
-                                        null,
-                                        null);
-
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("Set value loaded from store into entry [" +
-                                            "oldVer=" + ver +
-                                            ", newVer=" + verVal.version() +
-                                            ", entry=" + entry + ']');
-                                    }
-
-                                    ver = verVal.version();
-
-                                    break;
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed entry, (will retry): " + entry);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    // Wrap errors (will be unwrapped).
-                                    throw new GridClosureException(e);
-                                }
-                            }
-                        }
-                        else
-                            ver = SER_READ_EMPTY_ENTRY_VER;
-
-                        c.apply(key, val, ver);
-                    }
-                });
-
-                for (KeyCacheObject key : misses0.keySet())
-                    c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
-            }
-
-            return new GridFinishedFuture<>();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-    }
-
     /**
      * Gets minimum version present in transaction.
      *
@@ -571,21 +431,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void commit() throws IgniteCheckedException {
-        try {
-            commitAsync().get();
-        }
-        finally {
-            cctx.tm().resetContext();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepare() throws IgniteCheckedException {
-        prepareAsync().get();
-    }
-
     /**
      * Checks that locks are in proper state for commit.
      *
@@ -1103,2484 +948,226 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * @param entry Entry.
-     * @return {@code True} if local node is current primary for given entry.
+     * @param ctx Cache context.
+     * @param key Key.
+     * @param expiryPlc Expiry policy.
+     * @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
      */
-    private boolean primaryLocal(GridCacheEntryEx entry) {
-        return entry.context().affinity().primaryByPartition(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE);
+    protected IgniteCacheExpiryPolicy accessPolicy(
+        GridCacheContext ctx,
+        IgniteTxKey key,
+        @Nullable ExpiryPolicy expiryPlc
+    ) {
+        return null;
     }
 
     /**
      * @param cacheCtx Cache context.
-     * @param keys Key to enlist.
-     * @param expiryPlc Explicitly specified expiry policy for entry.
-     * @param map Return map.
-     * @param missed Map of missed keys.
-     * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
-     * @param deserializeBinary Deserialize binary flag.
-     * @param skipVals Skip values flag.
-     * @param keepCacheObjects Keep cache objects flag.
-     * @param skipStore Skip store flag.
-     * @throws IgniteCheckedException If failed.
-     * @return Enlisted keys.
+     * @param keys Keys.
+     * @return Expiry policy.
      */
-    @SuppressWarnings({"RedundantTypeArguments"})
-    private <K, V> Collection<KeyCacheObject> enlistRead(
-        final GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        Collection<KeyCacheObject> keys,
-        @Nullable ExpiryPolicy expiryPlc,
-        Map<K, V> map,
-        Map<KeyCacheObject, GridCacheVersion> missed,
-        int keysCnt,
-        boolean deserializeBinary,
-        boolean skipVals,
-        boolean keepCacheObjects,
-        boolean skipStore,
-        final boolean needVer
-    ) throws IgniteCheckedException {
-        assert !F.isEmpty(keys);
-        assert keysCnt == keys.size();
-
-        cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
-
-        boolean single = keysCnt == 1;
-
-        Collection<KeyCacheObject> lockKeys = null;
-
-        AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
-
-        boolean needReadVer = (serializable() && optimistic()) || needVer;
-
-        // In this loop we cover only read-committed or optimistic transactions.
-        // Transactions that are pessimistic and not read-committed are covered
-        // outside of this loop.
-        for (KeyCacheObject key : keys) {
-            if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
-                addActiveCache(cacheCtx);
-
-            IgniteTxKey txKey = cacheCtx.txKey(key);
-
-            // Check write map (always check writes first).
-            IgniteTxEntry txEntry = entry(txKey);
-
-            // Either non-read-committed or there was a previous write.
-            if (txEntry != null) {
-                CacheObject val = txEntry.value();
-
-                if (txEntry.hasValue()) {
-                    if (!F.isEmpty(txEntry.entryProcessors()))
-                        val = txEntry.applyEntryProcessors(val);
-
-                    if (val != null) {
-                        GridCacheVersion ver = null;
-
-                        if (needVer) {
-                            if (txEntry.op() != READ)
-                                ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
-                            else {
-                                ver = txEntry.entryReadVersion();
+    protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+        return null;
+    }
 
-                                if (ver == null && pessimistic()) {
-                                    while (true) {
-                                        try {
-                                            GridCacheEntryEx cached = txEntry.cached();
+    /**
+     * Post lock processing for put or remove.
+     *
+     * @param cacheCtx Context.
+     * @param keys Keys.
+     * @param ret Return value.
+     * @param rmv {@code True} if remove.
+     * @param retval Flag to return value or not.
+     * @param read {@code True} if read.
+     * @param accessTtl TTL for read operation.
+     * @param filter Filter to check entries.
+     * @throws IgniteCheckedException If error.
+     * @param computeInvoke If {@code true} computes return value for invoke operation.
+     */
+    @SuppressWarnings("unchecked")
+    protected final void postLockWrite(
+        GridCacheContext cacheCtx,
+        Iterable<KeyCacheObject> keys,
+        GridCacheReturn ret,
+        boolean rmv,
+        boolean retval,
+        boolean read,
+        long accessTtl,
+        CacheEntryPredicate[] filter,
+        boolean computeInvoke
+    ) throws IgniteCheckedException {
+        for (KeyCacheObject k : keys) {
+            IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
 
-                                            ver = cached.isNear() ?
-                                                ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+            if (txEntry == null)
+                throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
+                    "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
 
-                                            break;
-                                        }
-                                        catch (GridCacheEntryRemovedException ignored) {
-                                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
-                                        }
-                                    }
-                                }
+            while (true) {
+                GridCacheEntryEx cached = txEntry.cached();
 
-                                if (ver == null) {
-                                    assert optimistic() && repeatableRead() : this;
+                try {
+                    assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
+                        "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
+                            ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
 
-                                    ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
-                                }
-                            }
+                    if (log.isDebugEnabled())
+                        log.debug("Post lock write entry: " + cached);
 
-                            assert ver != null;
-                        }
+                    CacheObject v = txEntry.previousValue();
+                    boolean hasPrevVal = txEntry.hasPreviousValue();
 
-                        cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
-                            ver, 0, 0);
-                    }
-                }
-                else {
-                    assert txEntry.op() == TRANSFORM;
+                    if (onePhaseCommit())
+                        filter = txEntry.filters();
 
-                    while (true) {
-                        try {
-                            GridCacheVersion readVer = null;
-                            EntryGetResult getRes = null;
+                    // If we have user-passed filter, we must read value into entry for peek().
+                    if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
+                        retval = true;
 
-                            Object transformClo =
-                                (txEntry.op() == TRANSFORM &&
-                                    cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
-                                    F.first(txEntry.entryProcessors()) : null;
+                    boolean invoke = txEntry.op() == TRANSFORM;
 
-                            if (needVer) {
-                                getRes = txEntry.cached().innerGetVersioned(
-                                    null,
-                                    this,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /*update-metrics*/true,
-                                    /*event*/!skipVals,
-                                    CU.subjectId(this, cctx),
-                                    transformClo,
-                                    resolveTaskName(),
-                                    null,
-                                    txEntry.keepBinary(),
-                                    null);
+                    if (retval || invoke) {
+                        if (!cacheCtx.isNear()) {
+                            if (!hasPrevVal) {
+                                // For non-local cache should read from store after lock on primary.
+                                boolean readThrough = cacheCtx.isLocal() &&
+                                    (invoke || cacheCtx.loadPreviousValue()) &&
+                                    !txEntry.skipStore();
 
-                                if (getRes != null) {
-                                    val = getRes.value();
-                                    readVer = getRes.version();
-                                }
-                            }
-                            else {
-                                val = txEntry.cached().innerGet(
+                                v = cached.innerGet(
                                     null,
                                     this,
                                     /*swap*/true,
-                                    /*read-through*/false,
-                                    /*metrics*/true,
-                                    /*event*/!skipVals,
+                                    readThrough,
+                                    /*metrics*/!invoke,
+                                    /*event*/!invoke && !dht(),
                                     /*temporary*/false,
                                     CU.subjectId(this, cctx),
-                                    transformClo,
+                                    null,
                                     resolveTaskName(),
                                     null,
                                     txEntry.keepBinary());
                             }
-
-                            if (val != null) {
-                                if (!readCommitted() && !skipVals)
-                                    txEntry.readValue(val);
-
-                                if (!F.isEmpty(txEntry.entryProcessors()))
-                                    val = txEntry.applyEntryProcessors(val);
-
-                                cacheCtx.addResult(map,
-                                    key,
-                                    val,
-                                    skipVals,
-                                    keepCacheObjects,
-                                    deserializeBinary,
-                                    false,
-                                    getRes,
-                                    readVer,
-                                    0,
-                                    0,
-                                    needVer);
-                            }
-                            else
-                                missed.put(key, txEntry.cached().version());
-
-                            break;
                         }
-                        catch (GridCacheEntryRemovedException ignored) {
-                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+                        else {
+                            if (!hasPrevVal)
+                                v = cached.rawGetOrUnmarshal(false);
                         }
-                    }
-                }
-            }
-            // First time access within transaction.
-            else {
-                if (lockKeys == null && !skipVals)
-                    lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
 
-                if (!single && !skipVals)
-                    lockKeys.add(key);
+                        if (txEntry.op() == TRANSFORM) {
+                            if (computeInvoke) {
+                                GridCacheVersion ver;
+
+                                try {
+                                    ver = cached.version();
+                                }
+                                catch (GridCacheEntryRemovedException e) {
+                                    assert optimistic() : txEntry;
 
-                while (true) {
-                    GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
 
-                    try {
-                        GridCacheVersion ver = entry.version();
-
-                        CacheObject val = null;
-                        GridCacheVersion readVer = null;
-                        EntryGetResult getRes = null;
-
-                        if (!pessimistic() || readCommitted() && !skipVals) {
-                            IgniteCacheExpiryPolicy accessPlc =
-                                optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
-
-                            if (needReadVer) {
-                                getRes = primaryLocal(entry) ?
-                                    entry.innerGetVersioned(
-                                        null,
-                                        this,
-                                        /*swap*/true,
-                                        /*unmarshal*/true,
-                                        /*metrics*/true,
-                                        /*event*/true,
-                                        CU.subjectId(this, cctx),
-                                        null,
-                                        resolveTaskName(),
-                                        accessPlc,
-                                        !deserializeBinary,
-                                        null) : null;
-
-                                if (getRes != null) {
-                                    val = getRes.value();
-                                    readVer = getRes.version();
+                                    ver = null;
                                 }
-                            }
-                            else {
-                                val = entry.innerGet(
-                                    null,
-                                    this,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*metrics*/true,
-                                    /*event*/true,
-                                    /*temporary*/false,
-                                    CU.subjectId(this, cctx),
-                                    null,
-                                    resolveTaskName(),
-                                    accessPlc,
-                                    !deserializeBinary);
-                            }
 
-                            if (val != null) {
-                                cacheCtx.addResult(map,
-                                    key,
-                                    val,
-                                    skipVals,
-                                    keepCacheObjects,
-                                    deserializeBinary,
-                                    false,
-                                    getRes,
-                                    readVer,
-                                    0,
-                                    0,
-                                    needVer);
+                                addInvokeResult(txEntry, v, ret, ver);
                             }
-                            else
-                                missed.put(key, ver);
                         }
                         else
-                            // We must wait for the lock in pessimistic mode.
-                            missed.put(key, ver);
-
-                        if (!readCommitted() && !skipVals) {
-                            txEntry = addEntry(READ,
-                                val,
-                                null,
-                                null,
-                                entry,
-                                expiryPlc,
-                                null,
-                                true,
-                                -1L,
-                                -1L,
-                                null,
-                                skipStore,
-                                !deserializeBinary);
-
-                            // As optimization, mark as checked immediately
-                            // for non-pessimistic if value is not null.
-                            if (val != null && !pessimistic()) {
-                                txEntry.markValid();
-
-                                if (needReadVer) {
-                                    assert readVer != null;
-
-                                    txEntry.entryReadVersion(readVer);
-                                }
-                            }
-                        }
-
-                        break; // While.
+                            ret.value(cacheCtx, v, txEntry.keepBinary());
                     }
-                    catch (GridCacheEntryRemovedException ignored) {
+
+                    boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
+
+                    // For remove operation we return true only if we are removing s/t,
+                    // i.e. cached value is not null.
+                    ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
+
+                    if (onePhaseCommit())
+                        txEntry.filtersPassed(pass);
+
+                    boolean updateTtl = read;
+
+                    if (pass) {
+                        txEntry.markValid();
+
                         if (log.isDebugEnabled())
-                            log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+                            log.debug("Filter passed in post lock for key: " + k);
                     }
-                    finally {
-                        if (entry != null && readCommitted()) {
-                            if (cacheCtx.isNear()) {
-                                if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
-                                    if (entry.markObsolete(xidVer))
-                                        cacheCtx.cache().removeEntry(entry);
-                                }
-                            }
-                            else
-                                entry.context().evicts().touch(entry, topVer);
-                        }
+                    else {
+                        // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
+                        txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
+                        txEntry.filters(CU.empty0());
+                        txEntry.filtersSet(false);
+
+                        updateTtl = !cacheCtx.putIfAbsentFilter(filter);
                     }
-                }
-            }
-        }
 
-        return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
-    }
+                    if (updateTtl) {
+                        if (!read) {
+                            ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry);
 
-    /**
-     * @param ctx Cache context.
-     * @param key Key.
-     * @param expiryPlc Expiry policy.
-     * @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
-     */
-    protected IgniteCacheExpiryPolicy accessPolicy(
-        GridCacheContext ctx,
-        IgniteTxKey key,
-        @Nullable ExpiryPolicy expiryPlc
-    ) {
-        return null;
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param keys Keys.
-     * @return Expiry policy.
-     */
-    protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
-        return null;
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param topVer Topology version.
-     * @param map Return map.
-     * @param missedMap Missed keys.
-     * @param deserializeBinary Deserialize binary flag.
-     * @param skipVals Skip values flag.
-     * @param keepCacheObjects Keep cache objects flag.
-     * @param skipStore Skip store flag.
-     * @param expiryPlc Expiry policy.
-     * @return Loaded key-value pairs.
-     */
-    private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
-        final GridCacheContext cacheCtx,
-        final AffinityTopologyVersion topVer,
-        final Map<K, V> map,
-        final Map<KeyCacheObject, GridCacheVersion> missedMap,
-        final boolean deserializeBinary,
-        final boolean skipVals,
-        final boolean keepCacheObjects,
-        final boolean skipStore,
-        final boolean needVer,
-        final ExpiryPolicy expiryPlc
-
-    ) {
-        if (log.isDebugEnabled())
-            log.debug("Loading missed values for missed map: " + missedMap);
-
-        final boolean needReadVer = (serializable() && optimistic()) || needVer;
-
-        return new GridEmbeddedFuture<>(
-            new C2<Void, Exception, Map<K, V>>() {
-                @Override public Map<K, V> apply(Void v, Exception e) {
-                    if (e != null) {
-                        setRollbackOnly();
-
-                        throw new GridClosureException(e);
-                    }
-
-                    return map;
-                }
-            },
-            loadMissing(
-                cacheCtx,
-                topVer,
-                !skipStore,
-                false,
-                missedMap.keySet(),
-                skipVals,
-                needReadVer,
-                !deserializeBinary,
-                expiryPlc,
-                new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
-                    @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
-                        if (isRollbackOnly()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Ignoring loaded value for read because transaction was rolled back: " +
-                                    IgniteTxLocalAdapter.this);
-
-                            return;
-                        }
-
-                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
-                        CacheObject visibleVal = cacheVal;
-
-                        IgniteTxKey txKey = cacheCtx.txKey(key);
-
-                        IgniteTxEntry txEntry = entry(txKey);
-
-                        if (txEntry != null) {
-                            if (!readCommitted())
-                                txEntry.readValue(cacheVal);
-
-                            if (!F.isEmpty(txEntry.entryProcessors()))
-                                visibleVal = txEntry.applyEntryProcessors(visibleVal);
-                        }
-
-                        assert txEntry != null || readCommitted() || skipVals;
-
-                        GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached();
-
-                        if (readCommitted() || skipVals) {
-                            cacheCtx.evicts().touch(e, topologyVersion());
-
-                            if (visibleVal != null) {
-                                cacheCtx.addResult(map,
-                                    key,
-                                    visibleVal,
-                                    skipVals,
-                                    keepCacheObjects,
-                                    deserializeBinary,
-                                    false,
-                                    needVer ? loadVer : null,
-                                    0,
-                                    0);
-                            }
-                        }
-                        else {
-                            assert txEntry != null;
-
-                            txEntry.setAndMarkValid(cacheVal);
-
-                            if (needReadVer) {
-                                assert loadVer != null;
-
-                                txEntry.entryReadVersion(loadVer);
-                            }
-
-                            if (visibleVal != null) {
-                                cacheCtx.addResult(map,
-                                    key,
-                                    visibleVal,
-                                    skipVals,
-                                    keepCacheObjects,
-                                    deserializeBinary,
-                                    false,
-                                    needVer ? loadVer : null,
-                                    0,
-                                    0);
-                            }
-                        }
-                    }
-                })
-        );
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
-        final GridCacheContext cacheCtx,
-        @Nullable final AffinityTopologyVersion entryTopVer,
-        Collection<KeyCacheObject> keys,
-        final boolean deserializeBinary,
-        final boolean skipVals,
-        final boolean keepCacheObjects,
-        final boolean skipStore,
-        final boolean needVer) {
-        if (F.isEmpty(keys))
-            return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
-
-        init();
-
-        int keysCnt = keys.size();
-
-        boolean single = keysCnt == 1;
-
-        try {
-            checkValid();
-
-            final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
-
-            final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
-
-            CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
-            ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
-
-            final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
-                entryTopVer,
-                keys,
-                expiryPlc,
-                retMap,
-                missed,
-                keysCnt,
-                deserializeBinary,
-                skipVals,
-                keepCacheObjects,
-                skipStore,
-                needVer);
-
-            if (single && missed.isEmpty())
-                return new GridFinishedFuture<>(retMap);
-
-            // Handle locks.
-            if (pessimistic() && !readCommitted() && !skipVals) {
-                if (expiryPlc == null)
-                    expiryPlc = cacheCtx.expiry();
-
-                long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
-                long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
-
-                long timeout = remainingTime();
-
-                if (timeout == -1)
-                    return new GridFinishedFuture<>(timeoutException());
-
-                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
-                    timeout,
-                    this,
-                    true,
-                    true,
-                    isolation,
-                    isInvalidate(),
-                    createTtl,
-                    accessTtl);
-
-                final ExpiryPolicy expiryPlc0 = expiryPlc;
-
-                PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
-                    @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
-                        if (log.isDebugEnabled())
-                            log.debug("Acquired transaction lock for read on keys: " + lockKeys);
-
-                        // Load keys only after the locks have been acquired.
-                        for (KeyCacheObject cacheKey : lockKeys) {
-                            K keyVal = (K)
-                                (keepCacheObjects ? cacheKey :
-                                cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
-
-                            if (retMap.containsKey(keyVal))
-                                // We already have a return value.
-                                continue;
-
-                            IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
-                            IgniteTxEntry txEntry = entry(txKey);
-
-                            assert txEntry != null;
-
-                            // Check if there is cached value.
-                            while (true) {
-                                GridCacheEntryEx cached = txEntry.cached();
-
-                                CacheObject val = null;
-                                GridCacheVersion readVer = null;
-                                EntryGetResult getRes = null;
-
-                                try {
-                                    Object transformClo =
-                                        (!F.isEmpty(txEntry.entryProcessors()) &&
-                                            cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
-                                            F.first(txEntry.entryProcessors()) : null;
-
-                                    if (needVer) {
-                                        getRes = cached.innerGetVersioned(
-                                            null,
-                                            IgniteTxLocalAdapter.this,
-                                            /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
-                                            /*unmarshal*/true,
-                                            /*update-metrics*/true,
-                                            /*event*/!skipVals,
-                                            CU.subjectId(IgniteTxLocalAdapter.this, cctx),
-                                            transformClo,
-                                            resolveTaskName(),
-                                            null,
-                                            txEntry.keepBinary(),
-                                            null);
-
-                                        if (getRes != null) {
-                                            val = getRes.value();
-                                            readVer = getRes.version();
-                                        }
-                                    }
-                                    else{
-                                        val = cached.innerGet(
-                                            null,
-                                            IgniteTxLocalAdapter.this,
-                                            cacheCtx.isSwapOrOffheapEnabled(),
-                                            /*read-through*/false,
-                                            /*metrics*/true,
-                                            /*events*/!skipVals,
-                                            /*temporary*/false,
-                                            CU.subjectId(IgniteTxLocalAdapter.this, cctx),
-                                            transformClo,
-                                            resolveTaskName(),
-                                            null,
-                                            txEntry.keepBinary());
-                                    }
-
-                                    // If value is in cache and passed the filter.
-                                    if (val != null) {
-                                        missed.remove(cacheKey);
-
-                                        txEntry.setAndMarkValid(val);
-
-                                        if (!F.isEmpty(txEntry.entryProcessors()))
-                                            val = txEntry.applyEntryProcessors(val);
-
-                                        cacheCtx.addResult(retMap,
-                                            cacheKey,
-                                            val,
-                                            skipVals,
-                                            keepCacheObjects,
-                                            deserializeBinary,
-                                            false,
-                                            getRes,
-                                            readVer,
-                                            0,
-                                            0,
-                                            needVer);
-
-                                        if (readVer != null)
-                                            txEntry.entryReadVersion(readVer);
-                                    }
-
-                                    // Even though we bring the value back from lock acquisition,
-                                    // we still need to recheck primary node for consistent values
-                                    // in case of concurrent transactional locks.
-
-                                    break; // While.
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed exception in get postLock (will retry): " +
-                                            cached);
-
-                                    txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
-                                }
-                            }
-                        }
-
-                        if (!missed.isEmpty() && cacheCtx.isLocal()) {
-                            AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
-                            if (topVer == null)
-                                topVer = entryTopVer;
-
-                            return checkMissed(cacheCtx,
-                                topVer != null ? topVer : topologyVersion(),
-                                retMap,
-                                missed,
-                                deserializeBinary,
-                                skipVals,
-                                keepCacheObjects,
-                                skipStore,
-                                needVer,
-                                expiryPlc0);
-                        }
-
-                        return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
-                    }
-                };
-
-                FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
-                    @Override Map<K, V> finish(Map<K, V> loaded) {
-                        retMap.putAll(loaded);
-
-                        return retMap;
-                    }
-                };
-
-                if (fut.isDone()) {
-                    try {
-                        IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
-
-                        return fut1.isDone() ?
-                            new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
-                            new GridEmbeddedFuture<>(finClos, fut1);
-                    }
-                    catch (GridClosureException e) {
-                        return new GridFinishedFuture<>(e.unwrap());
-                    }
-                    catch (IgniteCheckedException e) {
-                        try {
-                            return plc2.apply(false, e);
-                        }
-                        catch (Exception e1) {
-                            return new GridFinishedFuture<>(e1);
-                        }
-                    }
-                }
-                else {
-                    return new GridEmbeddedFuture<>(
-                        fut,
-                        plc2,
-                        finClos);
-                }
-            }
-            else {
-                assert optimistic() || readCommitted() || skipVals;
-
-                if (!missed.isEmpty()) {
-                    if (!readCommitted())
-                        for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
-                            KeyCacheObject cacheKey = it.next();
-
-                            K keyVal =
-                                (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
-
-                            if (retMap.containsKey(keyVal))
-                                it.remove();
-                        }
-
-                    if (missed.isEmpty())
-                        return new GridFinishedFuture<>(retMap);
-
-                    AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
-                    if (topVer == null)
-                        topVer = entryTopVer;
-
-                    return checkMissed(cacheCtx,
-                        topVer != null ? topVer : topologyVersion(),
-                        retMap,
-                        missed,
-                        deserializeBinary,
-                        skipVals,
-                        keepCacheObjects,
-                        skipStore,
-                        needVer,
-                        expiryPlc);
-                }
-
-                return new GridFinishedFuture<>(retMap);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            setRollbackOnly();
-
-            return new GridFinishedFuture<>(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        Map<? extends K, ? extends V> map,
-        boolean retval
-    ) {
-        return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
-            entryTopVer,
-            map,
-            null,
-            null,
-            null,
-            retval);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        K key,
-        V val,
-        boolean retval,
-        CacheEntryPredicate filter) {
-        return putAsync0(cacheCtx,
-            entryTopVer,
-            key,
-            val,
-            null,
-            null,
-            retval,
-            filter);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        K key,
-        EntryProcessor<K, V, Object> entryProcessor,
-        Object... invokeArgs) {
-        return (IgniteInternalFuture)putAsync0(cacheCtx,
-            entryTopVer,
-            key,
-            null,
-            entryProcessor,
-            invokeArgs,
-            true,
-            null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllDrAsync(
-        GridCacheContext cacheCtx,
-        Map<KeyCacheObject, GridCacheDrInfo> drMap
-    ) {
-        Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
-            @Override public Object apply(GridCacheDrInfo val) {
-                return val.value();
-            }
-        });
-
-        return this.<Object, Object>putAllAsync0(cacheCtx,
-            null,
-            map,
-            null,
-            null,
-            drMap,
-            false);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
-        Object... invokeArgs
-    ) {
-        return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
-            entryTopVer,
-            null,
-            map,
-            invokeArgs,
-            null,
-            true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllDrAsync(
-        GridCacheContext cacheCtx,
-        Map<KeyCacheObject, GridCacheVersion> drMap
-    ) {
-        return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
-    }
-
-    /**
-     * Checks filter for non-pessimistic transactions.
-     *
-     * @param cctx Cache context.
-     * @param key Key.
-     * @param val Value.
-     * @param filter Filter to check.
-     * @return {@code True} if passed or pessimistic.
-     */
-    private boolean filter(
-        GridCacheContext cctx,
-        KeyCacheObject key,
-        CacheObject val,
-        CacheEntryPredicate[] filter) {
-        return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter);
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param cacheKey Key to enlist.
-     * @param val Value.
-     * @param expiryPlc Explicitly specified expiry policy for entry.
-     * @param entryProcessor Entry processor (for invoke operation).
-     * @param invokeArgs Optional arguments for EntryProcessor.
-     * @param retval Flag indicating whether a value should be returned.
-     * @param lockOnly If {@code true}, then entry will be enlisted as noop.
-     * @param filter User filters.
-     * @param ret Return value.
-     * @param skipStore Skip store flag.
-     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
-     * @return Future for entry values loading.
-     */
-    private <K, V> IgniteInternalFuture<Void> enlistWrite(
-        final GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        KeyCacheObject cacheKey,
-        Object val,
-        @Nullable ExpiryPolicy expiryPlc,
-        @Nullable EntryProcessor<K, V, Object> entryProcessor,
-        @Nullable Object[] invokeArgs,
-        final boolean retval,
-        boolean lockOnly,
-        final CacheEntryPredicate[] filter,
-        final GridCacheReturn ret,
-        boolean skipStore,
-        final boolean singleRmv,
-        boolean keepBinary,
-        Byte dataCenterId) {
-        try {
-            addActiveCache(cacheCtx);
-
-            final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
-            final boolean needVal = singleRmv || retval || hasFilters;
-            final boolean needReadVer = needVal && (serializable() && optimistic());
-
-            if (entryProcessor != null)
-                transform = true;
-
-            GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
-
-            boolean loadMissed = enlistWriteEntry(cacheCtx,
-                entryTopVer,
-                cacheKey,
-                val,
-                entryProcessor,
-                invokeArgs,
-                expiryPlc,
-                retval,
-                lockOnly,
-                filter,
-                /*drVer*/drVer,
-                /*drTtl*/-1L,
-                /*drExpireTime*/-1L,
-                ret,
-                /*enlisted*/null,
-                skipStore,
-                singleRmv,
-                hasFilters,
-                needVal,
-                needReadVer,
-                keepBinary);
-
-            if (loadMissed) {
-                AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
-                if (topVer == null)
-                    topVer = entryTopVer;
-
-                return loadMissing(cacheCtx,
-                    topVer != null ? topVer : topologyVersion(),
-                    Collections.singleton(cacheKey),
-                    filter,
-                    ret,
-                    needReadVer,
-                    singleRmv,
-                    hasFilters,
-                    /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
-                    retval,
-                    keepBinary,
-                    expiryPlc);
-            }
-
-            return new GridFinishedFuture<>();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-    }
-
-    /**
-     * Internal routine for <tt>putAll(..)</tt>
-     *
-     * @param cacheCtx Cache context.
-     * @param keys Keys to enlist.
-     * @param expiryPlc Explicitly specified expiry policy for entry.
-     * @param lookup Value lookup map ({@code null} for remove).
-     * @param invokeMap Map with entry processors for invoke operation.
-     * @param invokeArgs Optional arguments for EntryProcessor.
-     * @param retval Flag indicating whether a value should be returned.
-     * @param lockOnly If {@code true}, then entry will be enlisted as noop.
-     * @param filter User filters.
-     * @param ret Return value.
-     * @param enlisted Collection of keys enlisted into this transaction.
-     * @param drPutMap DR put map (optional).
-     * @param drRmvMap DR remove map (optional).
-     * @param skipStore Skip store flag.
-     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
-     * @param keepBinary Keep binary flag.
-     * @param dataCenterId Optional data center ID.
-     * @return Future for missing values loading.
-     */
-    private <K, V> IgniteInternalFuture<Void> enlistWrite(
-        final GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        Collection<?> keys,
-        @Nullable ExpiryPolicy expiryPlc,
-        @Nullable Map<?, ?> lookup,
-        @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
-        @Nullable Object[] invokeArgs,
-        final boolean retval,
-        boolean lockOnly,
-        final CacheEntryPredicate[] filter,
-        final GridCacheReturn ret,
-        Collection<KeyCacheObject> enlisted,
-        @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
-        @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
-        boolean skipStore,
-        final boolean singleRmv,
-        final boolean keepBinary,
-        Byte dataCenterId
-    ) {
-        assert retval || invokeMap == null;
-
-        try {
-            addActiveCache(cacheCtx);
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-
-        boolean rmv = lookup == null && invokeMap == null;
-
-        final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
-        final boolean needVal = singleRmv || retval || hasFilters;
-        final boolean needReadVer = needVal && (serializable() && optimistic());
-
-        try {
-            // Set transform flag for transaction.
-            if (invokeMap != null)
-                transform = true;
-
-            Set<KeyCacheObject> missedForLoad = null;
-
-            for (Object key : keys) {
-                if (key == null) {
-                    rollback();
-
-                    throw new NullPointerException("Null key.");
-                }
-
-                Object val = rmv || lookup == null ? null : lookup.get(key);
-                EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
-
-                GridCacheVersion drVer;
-                long drTtl;
-                long drExpireTime;
-
-                if (drPutMap != null) {
-                    GridCacheDrInfo info = drPutMap.get(key);
-
-                    assert info != null;
-
-                    drVer = info.version();
-                    drTtl = info.ttl();
-                    drExpireTime = info.expireTime();
-                }
-                else if (drRmvMap != null) {
-                    assert drRmvMap.get(key) != null;
-
-                    drVer = drRmvMap.get(key);
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
-                else if (dataCenterId != null) {
-                    drVer = cctx.versions().next(dataCenterId);
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
-                else {
-                    drVer = null;
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
-
-                if (!rmv && val == null && entryProcessor == null) {
-                    setRollbackOnly();
-
-                    throw new NullPointerException("Null value.");
-                }
-
-                KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
-
-                boolean loadMissed = enlistWriteEntry(cacheCtx,
-                    entryTopVer,
-                    cacheKey,
-                    val,
-                    entryProcessor,
-                    invokeArgs,
-                    expiryPlc,
-                    retval,
-                    lockOnly,
-                    filter,
-                    drVer,
-                    drTtl,
-                    drExpireTime,
-                    ret,
-                    enlisted,
-                    skipStore,
-                    singleRmv,
-                    hasFilters,
-                    needVal,
-                    needReadVer,
-                    keepBinary);
-
-                if (loadMissed) {
-                    if (missedForLoad == null)
-                        missedForLoad = new HashSet<>();
-
-                    missedForLoad.add(cacheKey);
-                }
-            }
-
-            if (missedForLoad != null) {
-                AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
-                if (topVer == null)
-                    topVer = entryTopVer;
-
-                return loadMissing(cacheCtx,
-                    topVer != null ? topVer : topologyVersion(),
-                    missedForLoad,
-                    filter,
-                    ret,
-                    needReadVer,
-                    singleRmv,
-                    hasFilters,
-                    /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
-                    retval,
-                    keepBinary,
-                    expiryPlc);
-            }
-
-            return new GridFinishedFuture<>();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param keys Keys to load.
-     * @param filter Filter.
-     * @param ret Return value.
-     * @param needReadVer Read version flag.
-     * @param singleRmv {@code True} for single remove operation.
-     * @param hasFilters {@code True} if filters not empty.
-     * @param readThrough Read through flag.
-     * @param retval Return value flag.
-     * @param expiryPlc Expiry policy.
-     * @return Load future.
-     */
-    private IgniteInternalFuture<Void> loadMissing(
-        final GridCacheContext cacheCtx,
-        final AffinityTopologyVersion topVer,
-        final Set<KeyCacheObject> keys,
-        final CacheEntryPredicate[] filter,
-        final GridCacheReturn ret,
-        final boolean needReadVer,
-        final boolean singleRmv,
-        final boolean hasFilters,
-        final boolean readThrough,
-        final boolean retval,
-        final boolean keepBinary,
-        final ExpiryPolicy expiryPlc) {
-        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
-            new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
-                @Override public void apply(KeyCacheObject key,
-                    @Nullable Object val,
-                    @Nullable GridCacheVersion loadVer) {
-                    if (log.isDebugEnabled())
-                        log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
-
-                    IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
-
-                    assert e != null;
-
-                    if (needReadVer) {
-                        assert loadVer != null;
-
-                        e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
-                    }
-
-                    if (singleRmv) {
-                        assert !hasFilters && !retval;
-                        assert val == null || Boolean.TRUE.equals(val) : val;
-
-                        ret.set(cacheCtx, null, val != null, keepBinary);
-                    }
-                    else {
-                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
-                        if (e.op() == TRANSFORM) {
-                            GridCacheVersion ver;
-
-                            e.readValue(cacheVal);
-
-                            try {
-                                ver = e.cached().version();
-                            }
-                            catch (GridCacheEntryRemovedException ex) {
-                                assert optimistic() : e;
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
-
-                                ver = null;
-                            }
-
-                            addInvokeResult(e, cacheVal, ret, ver);
-                        }
-                        else {
-                            boolean success;
-
-                            if (hasFilters) {
-                                success = isAll(e.context(), key, cacheVal, filter);
-
-                                if (!success)
-                                    e.value(cacheVal, false, false);
-                            }
-                            else
-                                success = true;
-
-                            ret.set(cacheCtx, cacheVal, success, keepBinary);
-                        }
-                    }
-                }
-            };
-
-        return loadMissing(
-            cacheCtx,
-            topVer,
-            readThrough,
-            /*async*/true,
-            keys,
-            /*skipVals*/singleRmv,
-            needReadVer,
-            keepBinary,
-            expiryPlc,
-            c);
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param cacheKey Key.
-     * @param val Value.
-     * @param entryProcessor Entry processor.
-     * @param invokeArgs Optional arguments for EntryProcessor.
-     * @param expiryPlc Explicitly specified expiry policy for entry.
-     * @param retval Return value flag.
-     * @param lockOnly Lock only flag.
-     * @param filter Filter.
-     * @param drVer DR version.
-     * @param drTtl DR ttl.
-     * @param drExpireTime DR expire time.
-     * @param ret Return value.
-     * @param enlisted Enlisted keys collection.
-     * @param skipStore Skip store flag.
-     * @param singleRmv {@code True} for single remove operation.
-     * @param hasFilters {@code True} if filters not empty.
-     * @param needVal {@code True} if value is needed.
-     * @param needReadVer {@code True} if need read entry version.
-     * @return {@code True} if entry value should be loaded.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean enlistWriteEntry(GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        final KeyCacheObject cacheKey,
-        @Nullable final Object val,
-        @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
-        @Nullable final Object[] invokeArgs,
-        @Nullable final ExpiryPolicy expiryPlc,
-        final boolean retval,
-        final boolean lockOnly,
-        final CacheEntryPredicate[] filter,
-        final GridCacheVersion drVer,
-        final long drTtl,
-        long drExpireTime,
-        final GridCacheReturn ret,
-        @Nullable final Collection<KeyCacheObject> enlisted,
-        boolean skipStore,
-        boolean singleRmv,
-        boolean hasFilters,
-        final boolean needVal,
-        boolean needReadVer,
-        boolean keepBinary
-    ) throws IgniteCheckedException {
-        boolean loadMissed = false;
-
-        final boolean rmv = val == null && entryProcessor == null;
-
-        IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
-        IgniteTxEntry txEntry = entry(txKey);
-
-        // First time access.
-        if (txEntry == null) {
-            while (true) {
-                GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
-
-                try {
-                    entry.unswap(false);
-
-                    // Check if lock is being explicitly acquired by the same thread.
-                    if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
-                        entry.lockedByThread(threadId, xidVer)) {
-                        throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
-                            "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
-                            ", entry=" + entry +
-                            ", xidVer=" + xidVer +
-                            ", threadId=" + threadId +
-                            ", locNodeId=" + cctx.localNodeId() + ']');
-                    }
-
-                    CacheObject old = null;
-                    GridCacheVersion readVer = null;
-
-                    if (optimistic() && !implicit()) {
-                        try {
-                            if (needReadVer) {
-                                EntryGetResult res = primaryLocal(entry) ?
-                                    entry.innerGetVersioned(
-                                        null,
-                                        this,
-                                        /*swap*/false,
-                                        /*unmarshal*/retval || needVal,
-                                        /*metrics*/retval,
-                                        /*events*/retval,
-                                        CU.subjectId(this, cctx),
-                                        entryProcessor,
-                                        resolveTaskName(),
-                                        null,
-                                        keepBinary,
-                                        null) : null;
-
-                                if (res != null) {
-                                    old = res.value();
-                                    readVer = res.version();
-                                }
-                            }
-                            else {
-                                old = entry.innerGet(
-                                    null,
-                                    this,
-                                    /*swap*/false,
-                                    /*read-through*/false,
-                                    /*metrics*/retval,
-                                    /*events*/retval,
-                                    /*temporary*/false,
-                                    CU.subjectId(this, cctx),
-                                    entryProcessor,
-                                    resolveTaskName(),
-                                    null,
-                                    keepBinary);
-                            }
-                        }
-                        catch (ClusterTopologyCheckedException e) {
-                            entry.context().evicts().touch(entry, topologyVersion());
-
-                            throw e;
-                        }
-                    }
-                    else
-                        old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
-
-                    final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
-                        entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
-
-                    if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
-                        ret.set(cacheCtx, old, false, keepBinary);
-
-                        if (!readCommitted()) {
-                            if (optimistic() && serializable()) {
-                                txEntry = addEntry(op,
-                                    old,
-                                    entryProcessor,
-                                    invokeArgs,
-                                    entry,
-                                    expiryPlc,
-                                    filter,
-                                    true,
-                                    drTtl,
-                                    drExpireTime,
-                                    drVer,
-                                    skipStore,
-                                    keepBinary);
-                            }
-                            else {
-                                txEntry = addEntry(READ,
-                                    old,
-                                    null,
-                                    null,
-                                    entry,
-                                    null,
-                                    CU.empty0(),
-                                    false,
-                                    -1L,
-                                    -1L,
-                                    null,
-                                    skipStore,
-                                    keepBinary);
-                            }
-
-                            txEntry.markValid();
-
-                            if (needReadVer) {
-                                assert readVer != null;
-
-                                txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
-                            }
-                        }
-
-                        if (readCommitted())
-                            cacheCtx.evicts().touch(entry, topologyVersion());
-
-                        break; // While.
-                    }
-
-                    txEntry = addEntry(op,
-                        cacheCtx.toCacheObject(val),
-                        entryProcessor,
-                        invokeArgs,
-                        entry,
-                        expiryPlc,
-                        filter,
-                        true,
-                        drTtl,
-                        drExpireTime,
-                        drVer,
-                        skipStore,
-                        keepBinary);
-
-                    if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
-                        cacheCtx.evicts().touch(entry, topologyVersion());
-
-                    if (enlisted != null)
-                        enlisted.add(cacheKey);
-
-                    if (!pessimistic() && !implicit()) {
-                        txEntry.markValid();
-
-                        if (old == null) {
-                            if (needVal)
-                                loadMissed = true;
-                            else {
-                                assert !implicit() || !transform : this;
-                                assert txEntry.op() != TRANSFORM : txEntry;
-
-                                if (retval)
-                                    ret.set(cacheCtx, null, true, keepBinary);
-                                else
-                                    ret.success(true);
-                            }
-                        }
-                        else {
-                            if (needReadVer) {
-                                assert readVer != null;
-
-                                txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
-                            }
-
-                            if (retval && !transform)
-                                ret.set(cacheCtx, old, true, keepBinary);
-                            else {
-                                if (txEntry.op() == TRANSFORM) {
-                                    GridCacheVersion ver;
-
-                                    try {
-                                        ver = entry.version();
-                                    }
-                                    catch (GridCacheEntryRemovedException ex) {
-                                        assert optimistic() : txEntry;
-
-                                        if (log.isDebugEnabled())
-                                            log.debug("Failed to get entry version " +
-                                                "[err=" + ex.getMessage() + ']');
-
-                                        ver = null;
-                                    }
-
-                                    addInvokeResult(txEntry, old, ret, ver);
-                                }
-                                else
-                                    ret.success(true);
-                            }
-                        }
-                    }
-                    // Pessimistic.
-                    else {
-                        if (retval && !transform)
-                            ret.set(cacheCtx, old, true, keepBinary);
-                        else
-                            ret.success(true);
-                    }
-
-                    break; // While.
-                }
-                catch (GridCacheEntryRemovedException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry in transaction putAll0 method: " + entry);
-                }
-            }
-        }
-        else {
-            if (entryProcessor == null && txEntry.op() == TRANSFORM)
-                throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
-                    "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
-
-            GridCacheEntryEx entry = txEntry.cached();
-
-            CacheObject v = txEntry.value();
-
-            boolean del = txEntry.op() == DELETE && rmv;
-
-            if (!del) {
-                if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
-                    ret.set(cacheCtx, v, false, keepBinary);
-
-                    return loadMissed;
-                }
-
-                GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
-                    v != null ? UPDATE : CREATE;
-
-                txEntry = addEntry(op,
-                    cacheCtx.toCacheObject(val),
-                    entryProcessor,
-                    invokeArgs,
-                    entry,
-                    expiryPlc,
-                    filter,
-                    true,
-                    drTtl,
-                    drExpireTime,
-                    drVer,
-                    skipStore,
-                    keepBinary);
-
-                if (enlisted != null)
-                    enlisted.add(cacheKey);
-
-                if (txEntry.op() == TRANSFORM) {
-                    GridCacheVersion ver;
-
-                    try {
-                        ver = entry.version();
-                    }
-                    catch (GridCacheEntryRemovedException e) {
-                        assert optimistic() : txEntry;
-
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
-
-                        ver = null;
-                    }
-
-                    addInvokeResult(txEntry, txEntry.value(), ret, ver);
-                }
-            }
-
-            if (!pessimistic()) {
-                txEntry.markValid();
-
-                if (retval && !transform)
-                    ret.set(cacheCtx, v, true, keepBinary);
-                else
-                    ret.success(true);
-            }
-        }
-
-        return loadMissed;
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param key Key.
-     * @param val Value.
-     * @param filter Filter.
-     * @return {@code True} if filter passed.
-     */
-    private boolean isAll(GridCacheContext cctx,
-        KeyCacheObject key,
-        CacheObject val,
-        CacheEntryPredicate[] filter) {
-        GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
-            @Nullable @Override public CacheObject peekVisibleValue() {
-                return rawGet();
-            }
-        };
-
-        for (CacheEntryPredicate p0 : filter) {
-            if (p0 != null && !p0.apply(e))
-                return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * Post lock processing for put or remove.
-     *
-     * @param cacheCtx Context.
-     * @param keys Keys.
-     * @param ret Return value.
-     * @param rmv {@code True} if remove.
-     * @param retval Flag to return value or not.
-     * @param read {@code True} if read.
-     * @param accessTtl TTL for read operation.
-     * @param filter Filter to check entries.
-     * @throws IgniteCheckedException If error.
-     * @param computeInvoke If {@code true} computes return value for invoke operation.
-     */
-    @SuppressWarnings("unchecked")
-    protected final void postLockWrite(
-        GridCacheContext cacheCtx,
-        Iterable<KeyCacheObject> keys,
-        GridCacheReturn ret,
-        boolean rmv,
-        boolean retval,
-        boolean read,
-        long accessTtl,
-        CacheEntryPredicate[] filter,
-        boolean computeInvoke
-    ) throws IgniteCheckedException {
-        for (KeyCacheObject k : keys) {
-            IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
-
-            if (txEntry == null)
-                throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
-                    "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
-
-            while (true) {
-                GridCacheEntryEx cached = txEntry.cached();
-
-                try {
-                    assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
-                        "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
-                            ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
-
-                    if (log.isDebugEnabled())
-                        log.debug("Post lock write entry: " + cached);
-
-                    CacheObject v = txEntry.previousValue();
-                    boolean hasPrevVal = txEntry.hasPreviousValue();
-
-                    if (onePhaseCommit())
-                        filter = txEntry.filters();
-
-                    // If we have user-passed filter, we must read value into entry for peek().
-                    if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
-                        retval = true;
-
-                    boolean invoke = txEntry.op() == TRANSFORM;
-
-                    if (retval || invoke) {
-                        if (!cacheCtx.isNear()) {
-                            if (!hasPrevVal) {
-                                // For non-local cache should read from store after lock on primary.
-                                boolean readThrough = cacheCtx.isLocal() &&
-                                    (invoke || cacheCtx.loadPreviousValue()) &&
-                                    !txEntry.skipStore();
-
-                                v = cached.innerGet(
-                                    null,
-                                    this,
-                                    /*swap*/true,
-                                    readThrough,
-                                    /*metrics*/!invoke,
-                                    /*event*/!invoke && !dht(),
-                                    /*temporary*/false,
-                                    CU.subjectId(this, cctx),
-                                    null,
-                                    resolveTaskName(),
-                                    null,
-                                    txEntry.keepBinary());
-                            }
-                        }
-                        else {
-                            if (!hasPrevVal)
-                                v = cached.rawGetOrUnmarshal(false);
-                        }
-
-                        if (txEntry.op() == TRANSFORM) {
-                            if (computeInvoke) {
-                                GridCacheVersion ver;
-
-                                try {
-                                    ver = cached.version();
-                                }
-                                catch (GridCacheEntryRemovedException e) {
-                                    assert optimistic() : txEntry;
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
-
-                                    ver = null;
-                                }
-
-                                addInvokeResult(txEntry, v, ret, ver);
-                            }
-                        }
-                        else
-                            ret.value(cacheCtx, v, txEntry.keepBinary());
-                    }
-
-                    boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
-
-                    // For remove operation we return true only if we are removing s/t,
-                    // i.e. cached value is not null.
-                    ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
-
-                    if (onePhaseCommit())
-                        txEntry.filtersPassed(pass);
-
-                    boolean updateTtl = read;
-
-                    if (pass) {
-                        txEntry.markValid();
-
-                        if (log.isDebugEnabled())
-                            log.debug("Filter passed in post lock for key: " + k);
-                    }
-                    else {
-                        // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
-                        txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
-                        txEntry.filters(CU.empty0());
-                        txEntry.filtersSet(false);
-
-                        updateTtl = !cacheCtx.putIfAbsentFilter(filter);
-                    }
-
-                    if

<TRUNCATED>