You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/22 00:04:34 UTC

[41/46] incubator-ignite git commit: GG-9141 - Renaming.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
deleted file mode 100644
index 003315b..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ /dev/null
@@ -1,3178 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.security.*;
-import org.apache.ignite.portables.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
-import org.gridgain.grid.kernal.processors.cache.dr.*;
-import org.gridgain.grid.kernal.processors.dr.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.future.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
-import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
-
-/**
- * Transaction adapter for cache transactions.
- */
-public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K, V>
-    implements GridCacheTxLocalEx<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Per-transaction read map. */
-    @GridToStringExclude
-    protected Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> txMap;
-
-    /** Read view on transaction map. */
-    @GridToStringExclude
-    protected GridCacheTxMap<K, V> readView;
-
-    /** Write view on transaction map. */
-    @GridToStringExclude
-    protected GridCacheTxMap<K, V> writeView;
-
-    /** Minimal version encountered (either explicit lock or XID of this transaction). */
-    protected GridCacheVersion minVer;
-
-    /** Flag indicating with TM commit happened. */
-    protected AtomicBoolean doneFlag = new AtomicBoolean(false);
-
-    /** Committed versions, relative to base. */
-    private Collection<GridCacheVersion> committedVers = Collections.emptyList();
-
-    /** Rolled back versions, relative to base. */
-    private Collection<GridCacheVersion> rolledbackVers = Collections.emptyList();
-
-    /** Base for completed versions. */
-    private GridCacheVersion completedBase;
-
-    /** Flag indicating partition lock in group lock transaction. */
-    private boolean partLock;
-
-    /** Flag indicating that transformed values should be sent to remote nodes. */
-    private boolean sndTransformedVals;
-
-    /** Commit error. */
-    protected AtomicReference<Throwable> commitErr = new AtomicReference<>();
-
-    /** Active cache IDs. */
-    protected Set<Integer> activeCacheIds = new HashSet<>();
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    protected GridCacheTxLocalAdapter() {
-        // No-op.
-    }
-
-    /**
-     * @param cctx Cache registry.
-     * @param xidVer Transaction ID.
-     * @param implicit {@code True} if transaction was implicitly started by the system,
-     *      {@code false} if it was started explicitly by user.
-     * @param implicitSingle {@code True} if transaction is implicit with only one key.
-     * @param sys System flag.
-     * @param concurrency Concurrency.
-     * @param isolation Isolation.
-     * @param timeout Timeout.
-     * @param txSize Expected transaction size.
-     * @param grpLockKey Group lock key if this is a group-lock transaction.
-     * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
-     */
-    protected GridCacheTxLocalAdapter(
-        GridCacheSharedContext<K, V> cctx,
-        GridCacheVersion xidVer,
-        boolean implicit,
-        boolean implicitSingle,
-        boolean sys,
-        IgniteTxConcurrency concurrency,
-        IgniteTxIsolation isolation,
-        long timeout,
-        boolean invalidate,
-        boolean storeEnabled,
-        int txSize,
-        @Nullable GridCacheTxKey grpLockKey,
-        boolean partLock,
-        @Nullable UUID subjId,
-        int taskNameHash
-    ) {
-        super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
-            storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
-
-        assert !partLock || grpLockKey != null;
-
-        this.partLock = partLock;
-
-        minVer = xidVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID eventNodeId() {
-        return cctx.localNodeId();
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID originatingNodeId() {
-        return cctx.localNodeId();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean empty() {
-        return txMap.isEmpty();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> masterNodeIds() {
-        return Collections.singleton(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean partitionLock() {
-        return partLock;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Throwable commitError() {
-        return commitErr.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void commitError(Throwable e) {
-        commitErr.compareAndSet(null, e);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) {
-        assert false;
-        return false;
-    }
-
-    /**
-     * Gets collection of active cache IDs for this transaction.
-     *
-     * @return Collection of active cache IDs.
-     */
-    @Override public Collection<Integer> activeCacheIds() {
-        return activeCacheIds;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isStarted() {
-        return txMap != null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasWriteKey(GridCacheTxKey<K> key) {
-        return writeView.containsKey(key);
-    }
-
-    /**
-     * @return Transaction read set.
-     */
-    @Override public Set<GridCacheTxKey<K>> readSet() {
-        return txMap == null ? Collections.<GridCacheTxKey<K>>emptySet() : readView.keySet();
-    }
-
-    /**
-     * @return Transaction write set.
-     */
-    @Override public Set<GridCacheTxKey<K>> writeSet() {
-        return txMap == null ? Collections.<GridCacheTxKey<K>>emptySet() : writeView.keySet();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removed(GridCacheTxKey<K> key) {
-        if (txMap == null)
-            return false;
-
-        GridCacheTxEntry<K, V> e = txMap.get(key);
-
-        return e != null && e.op() == DELETE;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> readMap() {
-        return readView == null ? Collections.<GridCacheTxKey<K>, GridCacheTxEntry<K, V>>emptyMap() : readView;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> writeMap() {
-        return writeView == null ? Collections.<GridCacheTxKey<K>, GridCacheTxEntry<K, V>>emptyMap() : writeView;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridCacheTxEntry<K, V>> allEntries() {
-        return txMap == null ? Collections.<GridCacheTxEntry<K, V>>emptySet() : txMap.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridCacheTxEntry<K, V>> readEntries() {
-        return readView == null ? Collections.<GridCacheTxEntry<K, V>>emptyList() : readView.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridCacheTxEntry<K, V>> writeEntries() {
-        return writeView == null ? Collections.<GridCacheTxEntry<K, V>>emptyList() : writeView.values();
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public GridCacheTxEntry<K, V> entry(GridCacheTxKey<K> key) {
-        return txMap == null ? null : txMap.get(key);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void seal() {
-        if (readView != null)
-            readView.seal();
-
-        if (writeView != null)
-            writeView.seal();
-    }
-
-    /**
-     * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
-     * to remote nodes.
-     */
-    public void sendTransformedValues(boolean snd) {
-        sndTransformedVals = snd;
-    }
-
-    /**
-     * @return {@code True} if should be committed after lock is acquired.
-     */
-    protected boolean commitAfterLock() {
-        return implicit() && (!dht() || colocated());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable @Override public GridTuple<V> peek(
-        GridCacheContext<K, V> cacheCtx,
-        boolean failFast,
-        K key,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter
-    ) throws GridCacheFilterFailedException {
-        GridCacheTxEntry<K, V> e = txMap == null ? null : txMap.get(cacheCtx.txKey(key));
-
-        if (e != null) {
-            // We should look at tx entry previous value. If this is a user peek then previous
-            // value is the same as value. If this is a filter evaluation peek then previous value holds
-            // value visible to filter while value contains value enlisted for write.
-            if (!F.isAll(e.cached().wrap(false), filter))
-                return e.hasPreviousValue() ? F.t(CU.<V>failed(failFast, e.previousValue())) : null;
-
-            return e.hasPreviousValue() ? F.t(e.previousValue()) : null;
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<Boolean> loadMissing(
-        final GridCacheContext<K, V> cacheCtx,
-        boolean async,
-        final Collection<? extends K> keys,
-        boolean deserializePortable,
-        final IgniteBiInClosure<K, V> c
-    ) {
-        if (!async) {
-            try {
-                return new GridFinishedFuture<>(cctx.kernalContext(),
-                    cacheCtx.store().loadAllFromStore(this, keys, c));
-            }
-            catch (IgniteCheckedException e) {
-                return new GridFinishedFuture<>(cctx.kernalContext(), e);
-            }
-        }
-        else
-            return cctx.kernalContext().closure().callLocalSafe(
-                new GPC<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return cacheCtx.store().loadAllFromStore(GridCacheTxLocalAdapter.this, keys, c);
-                    }
-                },
-                true);
-    }
-
-    /**
-     * Gets minimum version present in transaction.
-     *
-     * @return Minimum versions.
-     */
-    @Override public GridCacheVersion minVersion() {
-        return minVer;
-    }
-
-    /**
-     * @throws IgniteCheckedException If prepare step failed.
-     */
-    @SuppressWarnings({"CatchGenericClass"})
-    public void userPrepare() throws IgniteCheckedException {
-        if (state() != PREPARING) {
-            if (timedOut())
-                throw new IgniteTxTimeoutException("Transaction timed out: " + this);
-
-            IgniteTxState state = state();
-
-            setRollbackOnly();
-
-            throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']');
-        }
-
-        checkValid();
-
-        try {
-            cctx.tm().prepareTx(this);
-        }
-        catch (IgniteCheckedException e) {
-            throw e;
-        }
-        catch (Throwable e) {
-            setRollbackOnly();
-
-            throw new IgniteCheckedException("Transaction validation produced a runtime exception: " + this, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void commit() throws IgniteCheckedException {
-        try {
-            commitAsync().get();
-        }
-        finally {
-            cctx.tm().txContextReset();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepare() throws IgniteCheckedException {
-        prepareAsync().get();
-    }
-
-    /**
-     * Checks that locks are in proper state for commit.
-     *
-     * @param entry Cache entry to check.
-     */
-    private void checkCommitLocks(GridCacheEntryEx<K, V> entry) {
-        assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry +
-            ", tx=" + this + ']';
-    }
-
-    /**
-     * Uncommits transaction by invalidating all of its entries.
-     */
-    @SuppressWarnings({"CatchGenericClass"})
-    private void uncommit() {
-        for (GridCacheTxEntry<K, V> e : writeMap().values()) {
-            try {
-                GridCacheEntryEx<K, V> cacheEntry = e.cached();
-
-                if (e.op() != NOOP)
-                    cacheEntry.invalidate(null, xidVer);
-            }
-            catch (Throwable t) {
-                U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);
-
-                break;
-            }
-        }
-
-        cctx.tm().uncommitTx(this);
-    }
-
-    /**
-     * Gets cache entry for given key.
-     *
-     * @param key Key.
-     * @return Cache entry.
-     */
-    protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key) {
-        return cacheCtx.cache().entryEx(key.key());
-    }
-
-    /**
-     * Gets cache entry for given key and topology version.
-     *
-     * @param key Key.
-     * @param topVer Topology version.
-     * @return Cache entry.
-     */
-    protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key, long topVer) {
-        return cacheCtx.cache().entryEx(key.key(), topVer);
-    }
-
-    /**
-     * Performs batch database operations. This commit must be called
-     * before {@link #userCommit()}. This way if there is a DB failure,
-     * cache transaction can still be rolled back.
-     *
-     * @param writeEntries Transaction write set.
-     * @throws IgniteCheckedException If batch update failed.
-     */
-    @SuppressWarnings({"CatchGenericClass"})
-    protected void batchStoreCommit(Iterable<GridCacheTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
-        GridCacheStoreManager<K, V> store = store();
-
-        if (store != null && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
-            try {
-                if (writeEntries != null) {
-                    Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
-                    List<K> rmvCol = null;
-
-                    boolean skipNear = near() && store.writeToStoreFromDht();
-
-                    for (GridCacheTxEntry<K, V> e : writeEntries) {
-                        if (skipNear && e.cached().isNear())
-                            continue;
-
-                        boolean intercept = e.context().config().getInterceptor() != null;
-
-                        if (intercept || !F.isEmpty(e.transformClosures()))
-                            e.cached().unswap(true, false);
-
-                        GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
-
-                        GridCacheContext<K, V> cacheCtx = e.context();
-
-                        GridCacheOperation op = res.get1();
-                        K key = e.key();
-                        V val = res.get2();
-                        GridCacheVersion ver = writeVersion();
-
-                        if (op == CREATE || op == UPDATE) {
-                            // Batch-process all removes if needed.
-                            if (rmvCol != null && !rmvCol.isEmpty()) {
-                                store.removeAllFromStore(this, rmvCol);
-
-                                // Reset.
-                                rmvCol.clear();
-                            }
-
-                            if (intercept) {
-                                V old = e.cached().rawGetOrUnmarshal(true);
-
-                                val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val);
-
-                                if (val == null)
-                                    continue;
-
-                                val = cacheCtx.unwrapTemporary(val);
-                            }
-
-                            if (putMap == null)
-                                putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
-
-                            putMap.put(key, F.t(val, ver));
-                        }
-                        else if (op == DELETE) {
-                            // Batch-process all puts if needed.
-                            if (putMap != null && !putMap.isEmpty()) {
-                                store.putAllToStore(this, putMap);
-
-                                // Reset.
-                                putMap.clear();
-                            }
-
-                            if (intercept) {
-                                V old = e.cached().rawGetOrUnmarshal(true);
-
-                                IgniteBiTuple<Boolean, V> t = cacheCtx.config().<K, V>getInterceptor()
-                                    .onBeforeRemove(key, old);
-
-                                if (cacheCtx.cancelRemove(t))
-                                    continue;
-                            }
-
-                            if (rmvCol == null)
-                                rmvCol = new LinkedList<>();
-
-                            rmvCol.add(key);
-                        }
-                        else if (log.isDebugEnabled())
-                            log.debug("Ignoring NOOP entry for batch store commit: " + e);
-                    }
-
-                    if (putMap != null && !putMap.isEmpty()) {
-                        assert rmvCol == null || rmvCol.isEmpty();
-
-                        // Batch put at the end of transaction.
-                        store.putAllToStore(this, putMap);
-                    }
-
-                    if (rmvCol != null && !rmvCol.isEmpty()) {
-                        assert putMap == null || putMap.isEmpty();
-
-                        // Batch remove at the end of transaction.
-                        store.removeAllFromStore(this, rmvCol);
-                    }
-                }
-
-                // Commit while locks are held.
-                store.txEnd(this, true);
-            }
-            catch (IgniteCheckedException ex) {
-                commitError(ex);
-
-                setRollbackOnly();
-
-                // Safe to remove transaction from committed tx list because nothing was committed yet.
-                cctx.tm().removeCommittedTx(this);
-
-                throw ex;
-            }
-            catch (Throwable ex) {
-                commitError(ex);
-
-                setRollbackOnly();
-
-                // Safe to remove transaction from committed tx list because nothing was committed yet.
-                cctx.tm().removeCommittedTx(this);
-
-                throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CatchGenericClass"})
-    @Override public void userCommit() throws IgniteCheckedException {
-        IgniteTxState state = state();
-
-        if (state != COMMITTING) {
-            if (timedOut())
-                throw new IgniteTxTimeoutException("Transaction timed out: " + this);
-
-            setRollbackOnly();
-
-            throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
-        }
-
-        checkValid();
-
-        boolean empty = F.isEmpty(near() ? txMap : writeMap());
-
-        // Register this transaction as completed prior to write-phase to
-        // ensure proper lock ordering for removed entries.
-        // We add colocated transaction to committed set even if it is empty to correctly order
-        // locks on backup nodes.
-        if (!empty || colocated())
-            cctx.tm().addCommittedTx(this);
-
-        if (groupLock())
-            addGroupTxMapping(writeSet());
-
-        if (!empty) {
-            // We are holding transaction-level locks for entries here, so we can get next write version.
-            writeVersion(cctx.versions().next(topologyVersion()));
-
-            batchStoreCommit(writeMap().values());
-
-            try {
-                cctx.tm().txContext(this);
-
-                long topVer = topologyVersion();
-
-                /*
-                 * Commit to cache. Note that for 'near' transaction we loop through all the entries.
-                 */
-                for (GridCacheTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) {
-                    GridCacheContext<K, V> cacheCtx = txEntry.context();
-
-                    GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE;
-
-                    UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId();
-
-                    try {
-                        while (true) {
-                            try {
-                                GridCacheEntryEx<K, V> cached = txEntry.cached();
-
-                                // Must try to evict near entries before committing from
-                                // transaction manager to make sure locks are held.
-                                if (!evictNearEntry(txEntry, false)) {
-                                    if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
-                                        cached.markObsolete(xidVer);
-
-                                        break;
-                                    }
-
-                                    if (cached.detached())
-                                        break;
-
-                                    GridCacheEntryEx<K, V> nearCached = null;
-
-                                    boolean metrics = true;
-
-                                    if (updateNearCache(cacheCtx, txEntry.key(), topVer))
-                                        nearCached = cacheCtx.dht().near().peekEx(txEntry.key());
-                                    else if (cacheCtx.isNear() && txEntry.locallyMapped())
-                                        metrics = false;
-
-                                    boolean evt = !isNearLocallyMapped(txEntry, false);
-
-                                    // For near local transactions we must record DHT version
-                                    // in order to keep near entries on backup nodes until
-                                    // backup remote transaction completes.
-                                    if (cacheCtx.isNear())
-                                        ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());
-
-                                    if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
-                                        txEntry.cached().unswap(true, false);
-
-                                    GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
-                                        true);
-
-                                    GridCacheOperation op = res.get1();
-                                    V val = res.get2();
-                                    byte[] valBytes = res.get3();
-
-                                    // Preserve TTL if needed.
-                                    if (txEntry.ttl() < 0)
-                                        txEntry.ttl(cached.ttl());
-
-                                    // Deal with DR conflicts.
-                                    GridCacheVersion explicitVer = txEntry.drVersion() != null ?
-                                        txEntry.drVersion() : writeVersion();
-
-                                    GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached,
-                                        txEntry,
-                                        explicitVer,
-                                        op,
-                                        val,
-                                        valBytes,
-                                        txEntry.ttl(),
-                                        txEntry.drExpireTime());
-
-                                    if (drRes != null) {
-                                        op = drRes.operation();
-                                        val = drRes.value();
-                                        valBytes = drRes.valueBytes();
-
-                                        if (drRes.isMerge())
-                                            explicitVer = writeVersion();
-                                    }
-                                    else
-                                        // Nullify explicit version so that innerSet/innerRemove will work as usual.
-                                        explicitVer = null;
-
-                                    if (sndTransformedVals || (drRes != null)) {
-                                        assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null);
-
-                                        txEntry.value(val, true, false);
-                                        txEntry.valueBytes(valBytes);
-                                        txEntry.op(op);
-                                        txEntry.transformClosures(null);
-                                        txEntry.drVersion(explicitVer);
-                                    }
-
-                                    if (op == CREATE || op == UPDATE) {
-                                        GridCacheUpdateTxResult<V> updRes = cached.innerSet(
-                                            this,
-                                            eventNodeId(),
-                                            txEntry.nodeId(),
-                                            val,
-                                            valBytes,
-                                            false,
-                                            false,
-                                            txEntry.ttl(),
-                                            evt,
-                                            metrics,
-                                            topVer,
-                                            txEntry.filters(),
-                                            cached.detached() ? DR_NONE : drType,
-                                            txEntry.drExpireTime(),
-                                            cached.isNear() ? null : explicitVer,
-                                            CU.subjectId(this, cctx),
-                                            resolveTaskName());
-
-                                        if (nearCached != null && updRes.success())
-                                            nearCached.innerSet(
-                                                null,
-                                                eventNodeId(),
-                                                nodeId,
-                                                val,
-                                                valBytes,
-                                                false,
-                                                false,
-                                                txEntry.ttl(),
-                                                false,
-                                                metrics,
-                                                topVer,
-                                                CU.<K, V>empty(),
-                                                DR_NONE,
-                                                txEntry.drExpireTime(),
-                                                null,
-                                                CU.subjectId(this, cctx),
-                                                resolveTaskName());
-                                    }
-                                    else if (op == DELETE) {
-                                        GridCacheUpdateTxResult<V> updRes = cached.innerRemove(
-                                            this,
-                                            eventNodeId(),
-                                            txEntry.nodeId(),
-                                            false,
-                                            false,
-                                            evt,
-                                            metrics,
-                                            topVer,
-                                            txEntry.filters(),
-                                            cached.detached()  ? DR_NONE : drType,
-                                            cached.isNear() ? null : explicitVer,
-                                            CU.subjectId(this, cctx),
-                                            resolveTaskName());
-
-                                        if (nearCached != null && updRes.success())
-                                            nearCached.innerRemove(
-                                                null,
-                                                eventNodeId(),
-                                                nodeId,
-                                                false,
-                                                false,
-                                                false,
-                                                metrics,
-                                                topVer,
-                                                CU.<K, V>empty(),
-                                                DR_NONE,
-                                                null,
-                                                CU.subjectId(this, cctx),
-                                                resolveTaskName());
-                                    }
-                                    else if (op == RELOAD) {
-                                        cached.innerReload(CU.<K, V>empty());
-
-                                        if (nearCached != null)
-                                            nearCached.innerReload(CU.<K, V>empty());
-                                    }
-                                    else if (op == READ) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Ignoring READ entry when committing: " + txEntry);
-                                    }
-                                    else {
-                                        assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()):
-                                            "Transaction does not own lock for group lock entry during  commit [tx=" +
-                                                this + ", txEntry=" + txEntry + ']';
-
-                                        if (log.isDebugEnabled())
-                                            log.debug("Ignoring NOOP entry when committing: " + txEntry);
-                                    }
-                                }
-
-                                // Check commit locks after set, to make sure that
-                                // we are not changing obsolete entries.
-                                // (innerSet and innerRemove will throw an exception
-                                // if an entry is obsolete).
-                                if (txEntry.op() != READ && !txEntry.groupLockEntry())
-                                    checkCommitLocks(cached);
-
-                                // Break out of while loop.
-                                break;
-                            }
-                            // If entry cached within transaction got removed.
-                            catch (GridCacheEntryRemovedException ignored) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
-
-                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), txEntry.keyBytes());
-                            }
-                        }
-                    }
-                    catch (Throwable ex) {
-                        // We are about to initiate transaction rollback when tx has started to committing.
-                        // Need to remove version from committed list.
-                        cctx.tm().removeCommittedTx(this);
-
-                        if (X.hasCause(ex, GridCacheIndexUpdateException.class) && cacheCtx.cache().isMongoDataCache()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to update mongo document index (transaction entry will " +
-                                    "be ignored): " + txEntry);
-
-                            // Set operation to NOOP.
-                            txEntry.op(NOOP);
-
-                            setRollbackOnly();
-
-                            throw ex;
-                        }
-                        else {
-                            IgniteCheckedException err = new IgniteTxHeuristicException("Failed to locally write to cache " +
-                                "(all transaction entries will be invalidated, however there was a window when " +
-                                "entries for this transaction were visible to others): " + this, ex);
-
-                            U.error(log, "Heuristic transaction failure.", err);
-
-                            commitErr.compareAndSet(null, err);
-
-                            state(UNKNOWN);
-
-                            try {
-                                // Courtesy to minimize damage.
-                                uncommit();
-                            }
-                            catch (Throwable ex1) {
-                                U.error(log, "Failed to uncommit transaction: " + this, ex1);
-                            }
-
-                            throw err;
-                        }
-                    }
-                }
-            }
-            finally {
-                cctx.tm().txContextReset();
-            }
-        }
-        else {
-            GridCacheStoreManager<K, V> store = store();
-
-            if (store != null && (!internal() || groupLock())) {
-                try {
-                    store.txEnd(this, true);
-                }
-                catch (IgniteCheckedException e) {
-                    commitError(e);
-
-                    setRollbackOnly();
-
-                    cctx.tm().removeCommittedTx(this);
-
-                    throw e;
-                }
-            }
-        }
-
-        // Do not unlock transaction entries if one-phase commit.
-        if (!onePhaseCommit()) {
-            if (doneFlag.compareAndSet(false, true)) {
-                // Unlock all locks.
-                cctx.tm().commitTx(this);
-
-                boolean needsCompletedVersions = needsCompletedVersions();
-
-                assert !needsCompletedVersions || completedBase != null;
-                assert !needsCompletedVersions || committedVers != null;
-                assert !needsCompletedVersions || rolledbackVers != null;
-            }
-        }
-    }
-
-    /**
-     * Commits transaction to transaction manager. Used for one-phase commit transactions only.
-     */
-    public void tmCommit() {
-        assert onePhaseCommit();
-
-        if (doneFlag.compareAndSet(false, true)) {
-            // Unlock all locks.
-            cctx.tm().commitTx(this);
-
-            state(COMMITTED);
-
-            boolean needsCompletedVersions = needsCompletedVersions();
-
-            assert !needsCompletedVersions || completedBase != null;
-            assert !needsCompletedVersions || committedVers != null;
-            assert !needsCompletedVersions || rolledbackVers != null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void completedVersions(
-        GridCacheVersion completedBase,
-        Collection<GridCacheVersion> committedVers,
-        Collection<GridCacheVersion> rolledbackVers) {
-        this.completedBase = completedBase;
-        this.committedVers = committedVers;
-        this.rolledbackVers = rolledbackVers;
-    }
-
-    /**
-     * @return Completed base for ordering.
-     */
-    public GridCacheVersion completedBase() {
-        return completedBase;
-    }
-
-    /**
-     * @return Committed versions.
-     */
-    public Collection<GridCacheVersion> committedVersions() {
-        return committedVers;
-    }
-
-    /**
-     * @return Rolledback versions.
-     */
-    public Collection<GridCacheVersion> rolledbackVersions() {
-        return rolledbackVers;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void userRollback() throws IgniteCheckedException {
-        IgniteTxState state = state();
-
-        if (state != ROLLING_BACK && state != ROLLED_BACK) {
-            setRollbackOnly();
-
-            throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']',
-                commitErr.get());
-        }
-
-        if (doneFlag.compareAndSet(false, true)) {
-            try {
-                if (near())
-                    // Must evict near entries before rolling back from
-                    // transaction manager, so they will be removed from cache.
-                    for (GridCacheTxEntry<K, V> e : allEntries())
-                        evictNearEntry(e, false);
-
-                cctx.tm().rollbackTx(this);
-
-                GridCacheStoreManager<K, V> store = store();
-
-                if (store != null && (near() || store.writeToStoreFromDht())) {
-                    if (!internal() || groupLock())
-                        store.txEnd(this, false);
-                }
-            }
-            catch (Error | IgniteCheckedException | RuntimeException e) {
-                U.addLastCause(e, commitErr.get(), log);
-
-                throw e;
-            }
-        }
-    }
-
-    /**
-     * Checks if there is a cached or swapped value for
-     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
-     *
-     *
-     * @param keys Key to enlist.
-     * @param cached Cached entry, if called from entry wrapper.
-     * @param map Return map.
-     * @param missed Map of missed keys.
-     * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
-     * @param deserializePortable Deserialize portable flag.
-     * @param filter Filter to test.
-     * @throws IgniteCheckedException If failed.
-     * @return Enlisted keys.
-     */
-    @SuppressWarnings({"RedundantTypeArguments"})
-    private Collection<K> enlistRead(
-        final GridCacheContext<K, V> cacheCtx,
-        Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached,
-        Map<K, V> map,
-        Map<K, GridCacheVersion> missed,
-        int keysCnt,
-        boolean deserializePortable,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
-        assert !F.isEmpty(keys);
-        assert keysCnt == keys.size();
-        assert cached == null || F.first(keys).equals(cached.key());
-
-        cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ);
-
-        groupLockSanityCheck(cacheCtx, keys);
-
-        boolean single = keysCnt == 1;
-
-        Collection<K> lockKeys = null;
-
-        long topVer = topologyVersion();
-
-        // In this loop we cover only read-committed or optimistic transactions.
-        // Transactions that are pessimistic and not read-committed are covered
-        // outside of this loop.
-        for (K key : keys) {
-            if (key == null)
-                continue;
-
-            if (pessimistic() && !readCommitted())
-                addActiveCache(cacheCtx);
-
-            GridCacheTxKey<K> txKey = cacheCtx.txKey(key);
-
-            // Check write map (always check writes first).
-            GridCacheTxEntry<K, V> txEntry = entry(txKey);
-
-            // Either non-read-committed or there was a previous write.
-            if (txEntry != null) {
-                if (cacheCtx.isAll(txEntry.cached(), filter)) {
-                    V val = txEntry.value();
-
-                    // Read value from locked entry in group-lock transaction as well.
-                    if (txEntry.hasValue()) {
-                        if (!F.isEmpty(txEntry.transformClosures())) {
-                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                val = clos.apply(val);
-                        }
-
-                        if (val != null) {
-                            V val0 = val;
-
-                            if (cacheCtx.portableEnabled())
-                                val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
-
-                            map.put(key, val0);
-                        }
-                    }
-                    else {
-                        assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
-
-                        while (true) {
-                            try {
-                                Object transformClo =
-                                    (txEntry.op() == TRANSFORM  && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
-                                        F.first(txEntry.transformClosures()) : null;
-
-                                val = txEntry.cached().innerGet(this,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*fail fast*/true,
-                                    /*unmarshal*/true,
-                                    /*metrics*/true,
-                                    /*event*/true,
-                                    /*temporary*/false,
-                                    CU.subjectId(this, cctx),
-                                    transformClo,
-                                    resolveTaskName(),
-                                    filter);
-
-                                if (val != null) {
-                                    if (!readCommitted())
-                                        txEntry.readValue(val);
-
-                                    if (!F.isEmpty(txEntry.transformClosures())) {
-                                        for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                            val = clos.apply(val);
-                                    }
-
-                                    V val0 = val;
-
-                                    if (cacheCtx.portableEnabled())
-                                        val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
-
-                                    map.put(key, val0);
-                                }
-                                else
-                                    missed.put(key, txEntry.cached().version());
-
-                                break;
-                            }
-                            catch (GridCacheFilterFailedException e) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Filter validation failed for entry: " + txEntry);
-
-                                if (!readCommitted())
-                                    txEntry.readValue(e.<V>value());
-                            }
-                            catch (GridCacheEntryRemovedException ignored) {
-                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), txEntry.keyBytes());
-                            }
-                        }
-                    }
-                }
-            }
-            // First time access within transaction.
-            else {
-                if (lockKeys == null)
-                    lockKeys = single ? (Collection<K>)keys : new ArrayList<K>(keysCnt);
-
-                if (!single)
-                    lockKeys.add(key);
-
-                while (true) {
-                    GridCacheEntryEx<K, V> entry;
-
-                    if (cached != null) {
-                        entry = cached;
-
-                        cached = null;
-                    }
-                    else
-                        entry = entryEx(cacheCtx, txKey, topVer);
-
-                    try {
-                        GridCacheVersion ver = entry.version();
-
-                        V val = null;
-
-                        if (!pessimistic() || readCommitted() || groupLock()) {
-                            // This call will check for filter.
-                            val = entry.innerGet(this,
-                                /*swap*/true,
-                                /*no read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /*metrics*/true,
-                                /*event*/true,
-                                /*temporary*/false,
-                                CU.subjectId(this, cctx),
-                                null,
-                                resolveTaskName(),
-                                filter);
-
-                            if (val != null) {
-                                V val0 = val;
-
-                                if (cacheCtx.portableEnabled())
-                                    val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
-
-                                map.put(key, val0);
-                            }
-                            else
-                                missed.put(key, ver);
-                        }
-                        else
-                            // We must wait for the lock in pessimistic mode.
-                            missed.put(key, ver);
-
-                        if (!readCommitted()) {
-                            txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null);
-
-                            if (groupLock())
-                                txEntry.groupLockEntry(true);
-
-                            // As optimization, mark as checked immediately
-                            // for non-pessimistic if value is not null.
-                            if (val != null && !pessimistic())
-                                txEntry.markValid();
-                        }
-
-                        break; // While.
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
-                    }
-                    catch (GridCacheFilterFailedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Filter validation failed for entry: " + entry);
-
-                        if (!readCommitted()) {
-                            // Value for which failure occurred.
-                            V val = e.<V>value();
-
-                            txEntry = addEntry(READ, val, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, null);
-
-                            // Mark as checked immediately for non-pessimistic.
-                            if (val != null && !pessimistic())
-                                txEntry.markValid();
-                        }
-
-                        break; // While loop.
-                    }
-                }
-            }
-        }
-
-        return lockKeys != null ? lockKeys : Collections.<K>emptyList();
-    }
-
-    /**
-     * Adds skipped key.
-     *
-     * @param skipped Skipped set (possibly {@code null}).
-     * @param key Key to add.
-     * @return Skipped set.
-     */
-    private Set<K> skip(Set<K> skipped, K key) {
-        if (skipped == null)
-            skipped = new GridLeanSet<>();
-
-        skipped.add(key);
-
-        if (log.isDebugEnabled())
-            log.debug("Added key to skipped set: " + key);
-
-        return skipped;
-    }
-
-    /**
-     * Loads all missed keys for
-     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
-     *
-     * @param map Return map.
-     * @param missedMap Missed keys.
-     * @param redos Keys to retry.
-     * @param deserializePortable Deserialize portable flag.
-     * @param filter Filter.
-     * @return Loaded key-value pairs.
-     */
-    private IgniteFuture<Map<K, V>> checkMissed(
-        final GridCacheContext<K, V> cacheCtx,
-        final Map<K, V> map,
-        final Map<K, GridCacheVersion> missedMap,
-        @Nullable final Collection<K> redos,
-        final boolean deserializePortable,
-        final IgnitePredicate<GridCacheEntry<K, V>>[] filter
-    ) {
-        assert redos != null || pessimistic();
-
-        if (log.isDebugEnabled())
-            log.debug("Loading missed values for missed map: " + missedMap);
-
-        final Collection<K> loaded = new HashSet<>();
-
-        return new GridEmbeddedFuture<>(cctx.kernalContext(),
-            loadMissing(
-                cacheCtx,
-                false, missedMap.keySet(), deserializePortable, new CI2<K, V>() {
-                /** */
-                private GridCacheVersion nextVer;
-
-                @Override public void apply(K key, V val) {
-                    if (isRollbackOnly()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Ignoring loaded value for read because transaction was rolled back: " +
-                                GridCacheTxLocalAdapter.this);
-
-                        return;
-                    }
-
-                    GridCacheVersion ver = missedMap.get(key);
-
-                    if (ver == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
-
-                        return;
-                    }
-
-                    V visibleVal = val;
-
-                    GridCacheTxKey<K> txKey = cacheCtx.txKey(key);
-
-                    GridCacheTxEntry<K, V> txEntry = entry(txKey);
-
-                    if (txEntry != null) {
-                        if (!readCommitted())
-                            txEntry.readValue(val);
-
-                        if (!F.isEmpty(txEntry.transformClosures())) {
-                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                visibleVal = clos.apply(visibleVal);
-                        }
-                    }
-
-                    // In pessimistic mode we hold the lock, so filter validation
-                    // should always be valid.
-                    if (pessimistic())
-                        ver = null;
-
-                    // Initialize next version.
-                    if (nextVer == null)
-                        nextVer = cctx.versions().next(topologyVersion());
-
-                    while (true) {
-                        assert txEntry != null || readCommitted() || groupLock();
-
-                        GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
-
-                        try {
-                            boolean pass = cacheCtx.isAll(e, filter);
-
-                            // Must initialize to true since even if filter didn't pass,
-                            // we still record the transaction value.
-                            boolean set = true;
-
-                            if (pass) {
-                                try {
-                                    set = e.versionedValue(val, ver, nextVer);
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed entry in transaction getAll method " +
-                                            "(will try again): " + e);
-
-                                    if (pessimistic() && !readCommitted() && !isRollbackOnly() &&
-                                        (!groupLock() || F.eq(e.key(), groupLockKey()))) {
-                                        U.error(log, "Inconsistent transaction state (entry got removed while " +
-                                            "holding lock) [entry=" + e + ", tx=" + GridCacheTxLocalAdapter.this + "]");
-
-                                        setRollbackOnly();
-
-                                        return;
-                                    }
-
-                                    if (txEntry != null)
-                                        txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
-
-                                    continue; // While loop.
-                                }
-                            }
-
-                            // In pessimistic mode, we should always be able to set.
-                            assert set || !pessimistic();
-
-                            if (readCommitted() || groupLock()) {
-                                cacheCtx.evicts().touch(e, topologyVersion());
-
-                                if (pass && visibleVal != null)
-                                    map.put(key, visibleVal);
-                            }
-                            else {
-                                assert txEntry != null;
-
-                                if (set || F.isEmptyOrNulls(filter)) {
-                                    txEntry.setAndMarkValid(val);
-
-                                    if (pass && visibleVal != null)
-                                        map.put(key, visibleVal);
-                                }
-                                else {
-                                    assert !pessimistic() : "Pessimistic transaction should not have to redo gets: " +
-                                        this;
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to set versioned value for entry (will redo): " + e);
-
-                                    redos.add(key);
-                                }
-                            }
-
-                            loaded.add(key);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Set value loaded from store into entry from transaction [set=" + set +
-                                    ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
-
-                            break; // While loop.
-                        }
-                        catch (IgniteCheckedException ex) {
-                            throw new IgniteException("Failed to put value for cache entry: " + e, ex);
-                        }
-                    }
-                }
-            }),
-            new C2<Boolean, Exception, Map<K, V>>() {
-                @Override public Map<K, V> apply(Boolean b, Exception e) {
-                    if (e != null) {
-                        setRollbackOnly();
-
-                        throw new GridClosureException(e);
-                    }
-
-                    if (!b && !readCommitted()) {
-                        // There is no store - we must mark the entries.
-                        for (K key : missedMap.keySet()) {
-                            GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
-
-                            if (txEntry != null)
-                                txEntry.markValid();
-                        }
-                    }
-
-                    if (readCommitted()) {
-                        Collection<K> notFound = new HashSet<>(missedMap.keySet());
-
-                        notFound.removeAll(loaded);
-
-                        // In read-committed mode touch entries that have just been read.
-                        for (K key : notFound) {
-                            GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
-
-                            GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
-                                txEntry.cached();
-
-                            if (entry != null)
-                                cacheCtx.evicts().touch(entry, topologyVersion());
-                        }
-                    }
-
-                    return map;
-                }
-            });
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<Map<K, V>> getAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
-        Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable,
-        final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        if (F.isEmpty(keys))
-            return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
-
-        init();
-
-        int keysCnt = keys.size();
-
-        boolean single = keysCnt == 1;
-
-        try {
-            checkValid();
-
-            final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
-
-            final Map<K, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
-
-            final Collection<K> lockKeys = enlistRead(cacheCtx, keys, cached, retMap, missed, keysCnt,
-                deserializePortable, filter);
-
-            if (single && missed.isEmpty())
-                return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
-
-            // Handle locks.
-            if (pessimistic() && !readCommitted() && !groupLock()) {
-                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true,
-                    isolation, isInvalidate(), CU.<K, V>empty());
-
-                PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
-                    @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException {
-                        if (log.isDebugEnabled())
-                            log.debug("Acquired transaction lock for read on keys: " + lockKeys);
-
-                        // Load keys only after the locks have been acquired.
-                        for (K key : lockKeys) {
-                            if (retMap.containsKey(key))
-                                // We already have a return value.
-                                continue;
-
-                            GridCacheTxKey<K> txKey = cacheCtx.txKey(key);
-
-                            GridCacheTxEntry<K, V> txEntry = entry(txKey);
-
-                            assert txEntry != null;
-
-                            // Check if there is cached value.
-                            while (true) {
-                                GridCacheEntryEx<K, V> cached = txEntry.cached();
-
-                                try {
-                                    Object transformClo =
-                                        (!F.isEmpty(txEntry.transformClosures()) &&
-                                            cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
-                                            F.first(txEntry.transformClosures()) : null;
-
-                                    V val = cached.innerGet(GridCacheTxLocalAdapter.this,
-                                        cacheCtx.isSwapOrOffheapEnabled(),
-                                        /*read-through*/false,
-                                        /*fail-fast*/true,
-                                        /*unmarshal*/true,
-                                        /*metrics*/true,
-                                        /*events*/true,
-                                        /*temporary*/true,
-                                        CU.subjectId(GridCacheTxLocalAdapter.this, cctx),
-                                        transformClo,
-                                        resolveTaskName(),
-                                        filter);
-
-                                    // If value is in cache and passed the filter.
-                                    if (val != null) {
-                                        missed.remove(key);
-
-                                        txEntry.setAndMarkValid(val);
-
-                                        if (!F.isEmpty(txEntry.transformClosures())) {
-                                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                                val = clos.apply(val);
-                                        }
-
-                                        if (cacheCtx.portableEnabled())
-                                            val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
-
-                                        retMap.put(key, val);
-                                    }
-
-                                    // Even though we bring the value back from lock acquisition,
-                                    // we still need to recheck primary node for consistent values
-                                    // in case of concurrent transactional locks.
-
-                                    break; // While.
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed exception in get postLock (will retry): " +
-                                            cached);
-
-                                    txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
-                                }
-                                catch (GridCacheFilterFailedException e) {
-                                    // Failed value for the filter.
-                                    V val = e.value();
-
-                                    if (val != null) {
-                                        // If filter fails after lock is acquired, we don't reload,
-                                        // regardless if value is null or not.
-                                        missed.remove(key);
-
-                                        txEntry.setAndMarkValid(val);
-                                    }
-
-                                    break; // While.
-                                }
-                            }
-                        }
-
-                        if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal()))
-                            return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, filter);
-
-                        return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
-                    }
-                };
-
-                FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
-                    @Override Map<K, V> finish(Map<K, V> loaded) {
-                        retMap.putAll(loaded);
-
-                        return retMap;
-                    }
-                };
-
-                if (fut.isDone()) {
-                    try {
-                        IgniteFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
-
-                        return fut1.isDone() ?
-                            new GridFinishedFutureEx<>(finClos.apply(fut1.get(), null)) :
-                            new GridEmbeddedFuture<>(cctx.kernalContext(), fut1, finClos);
-                    }
-                    catch (GridClosureException e) {
-                        return new GridFinishedFuture<>(cctx.kernalContext(), e.unwrap());
-                    }
-                    catch (IgniteCheckedException e) {
-                        try {
-                            return plc2.apply(false, e);
-                        }
-                        catch (Exception e1) {
-                            return new GridFinishedFuture<>(cctx.kernalContext(), e1);
-                        }
-                    }
-                }
-                else {
-                    return new GridEmbeddedFuture<>(
-                        cctx.kernalContext(),
-                        fut,
-                        plc2,
-                        finClos);
-                }
-            }
-            else {
-                assert optimistic() || readCommitted() || groupLock();
-
-                final Collection<K> redos = new LinkedList<>();
-
-                if (!missed.isEmpty()) {
-                    if (!readCommitted())
-                        for (Iterator<K> it = missed.keySet().iterator(); it.hasNext(); )
-                            if (retMap.containsKey(it.next()))
-                                it.remove();
-
-                    if (missed.isEmpty())
-                        return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
-
-                    return new GridEmbeddedFuture<>(
-                        cctx.kernalContext(),
-                        // First future.
-                        checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, filter),
-                        // Closure that returns another future, based on result from first.
-                        new PMC<Map<K, V>>() {
-                            @Override public IgniteFuture<Map<K, V>> postMiss(Map<K, V> map) {
-                                if (redos.isEmpty())
-                                    return new GridFinishedFuture<>(cctx.kernalContext(),
-                                        Collections.<K, V>emptyMap());
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Starting to future-recursively get values for keys: " + redos);
-
-                                // Future recursion.
-                                return getAllAsync(cacheCtx, redos, null, deserializePortable, filter);
-                            }
-                        },
-                        // Finalize.
-                        new FinishClosure<Map<K, V>>() {
-                            @Override Map<K, V> finish(Map<K, V> loaded) {
-                                for (Map.Entry<K, V> entry : loaded.entrySet()) {
-                                    GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(entry.getKey()));
-
-                                    V val = entry.getValue();
-
-                                    if (!readCommitted())
-                                        txEntry.readValue(val);
-
-                                    if (!F.isEmpty(txEntry.transformClosures())) {
-                                        for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                            val = clos.apply(val);
-                                    }
-
-                                    retMap.put(entry.getKey(), val);
-                                }
-
-                                return retMap;
-                            }
-                        }
-                    );
-                }
-
-                return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            setRollbackOnly();
-
-            return new GridFinishedFuture<>(cctx.kernalContext(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync(
-        GridCacheContext<K, V> cacheCtx,
-        Map<? extends K, ? extends V> map,
-        boolean retval,
-        @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter
-    ) {
-        return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> putAllDrAsync(
-        GridCacheContext<K, V> cacheCtx,
-        Map<? extends K, GridCacheDrInfo<V>> drMap
-    ) {
-        return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync(
-        GridCacheContext<K, V> cacheCtx,
-        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map,
-        boolean retval,
-        @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl
-    ) {
-        return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> removeAllDrAsync(
-        GridCacheContext<K, V> cacheCtx,
-        Map<? extends K, GridCacheVersion> drMap
-    ) {
-        return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
-    }
-
-    /**
-     * Checks filter for non-pessimistic transactions.
-     *
-     * @param cached Cached entry.
-     * @param filter Filter to check.
-     * @return {@code True} if passed or pessimistic.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean filter(GridCacheEntryEx<K, V> cached,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
-        return pessimistic() || cached.context().isAll(cached, filter);
-    }
-
-    /**
-     * Internal routine for <tt>putAll(..)</tt>
-     *
-     * @param keys Keys to enlist.
-     * @param cached Cached entry.
-     * @param ttl Time to live for entry. If negative, leave unchanged.
-     * @param implicit Implicit flag.
-     * @param lookup Value lookup map ({@code null} for remove).
-     * @param transformMap Map with transform closures if this is a transform operation.
-     * @param retval Flag indicating whether a value should be returned.
-     * @param lockOnly If {@code true}, then entry will be enlisted as noop.
-     * @param filter User filters.
-     * @param ret Return value.
-     * @param enlisted Collection of keys enlisted into this transaction.
-     * @param drPutMap DR put map (optional).
-     * @param drRmvMap DR remove map (optional).
-     * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
-     */
-    protected IgniteFuture<Set<K>> enlistWrite(
-        GridCacheContext<K, V> cacheCtx,
-        Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl,
-        boolean implicit,
-        @Nullable Map<? extends K, ? extends V> lookup,
-        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
-        boolean retval,
-        boolean lockOnly,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        final GridCacheReturn<V> ret,
-        Collection<K> enlisted,
-        @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap,
-        @Nullable Map<? extends K, GridCacheVersion> drRmvMap
-    ) {
-        assert cached == null || keys.size() == 1;
-        assert cached == null || F.first(keys).equals(cached.key());
-
-        try {
-            addActiveCache(cacheCtx);
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(cctx.kernalContext(), e);
-        }
-
-        Set<K> skipped = null;
-
-        boolean rmv = lookup == null && transformMap == null;
-
-        try {
-            // Set transform flag for transaction.
-            if (transformMap != null)
-                transform = true;
-
-            groupLockSanityCheck(cacheCtx, keys);
-
-            for (K key : keys) {
-                V val = rmv || lookup == null ? null : lookup.get(key);
-                IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key);
-
-                GridCacheVersion drVer;
-                long drTtl;
-                long drExpireTime;
-
-                if (drPutMap != null) {
-                    GridCacheDrInfo<V> info = drPutMap.get(key);
-
-                    assert info != null;
-
-                    drVer = info.version();
-                    drTtl = info.ttl();
-                    drExpireTime = info.expireTime();
-                }
-                else if (drRmvMap != null) {
-                    assert drRmvMap.get(key) != null;
-
-                    drVer = drRmvMap.get(key);
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
-                else {
-                    drVer = null;
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
-
-                if (key == null)
-                    continue;
-
-                if (!rmv && val == null && transformClo == null) {
-                    skipped = skip(skipped, key);
-
-                    continue;
-                }
-
-                if (cacheCtx.portableEnabled())
-                    key = (K)cacheCtx.marshalToPortable(key);
-
-                GridCacheTxKey<K> txKey = cacheCtx.txKey(key);
-
-                GridCacheTxEntry<K, V> txEntry = entry(txKey);
-
-                // First time access.
-                if (txEntry == null) {
-                    while (true) {
-                        GridCacheEntryEx<K, V> entry;
-
-                        if (cached != null) {
-                            entry = cached;
-
-                            cached = null;
-                        }
-                        else {
-                            entry = entryEx(cacheCtx, txKey, topologyVersion());
-
-                            entry.unswap(true, false);
-                        }
-
-                        try {
-                            // Check if lock is being explicitly acquired by the same thread.
-                            if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
-                                entry.lockedByThread(threadId, xidVer))
-                                throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
-                                    "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer +
-                                    ", threadId=" + threadId +
-                                    ", locNodeId=" + cctx.localNodeId() + ']');
-
-                            V old = null;
-
-                            boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
-
-                            if (optimistic()) {
-                                try {
-                                    //Should read through if filter is specified.
-                                    old = entry.innerGet(this,
-                                        /*swap*/false,
-                                        /*read-through*/readThrough,
-                                        /*fail-fast*/false,
-                                        /*unmarshal*/retval,
-                                        /*metrics*/retval,
-                                        /*events*/retval,
-                                        /*temporary*/false,
-                                        CU.subjectId(this, cctx),
-                                        transformClo,
-                                        resolveTaskName(),
-                                        CU.<K, V>empty());
-                                }
-                                catch (GridCacheFilterFailedException e) {
-                                    e.printStackTrace();
-
-                                    assert false : "Empty filter failed: " + e;
-                                }
-                            }
-                            else
-                                old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
-
-                            if (!filter(entry, filter)) {
-                                skipped = skip(skipped, key);
-
-                                ret.set(old, false);
-
-                                if (!readCommitted() && old != null) {
-                                    // Enlist failed filters as reads for non-read-committed mode,
-                                    // so future ops will get the same values.
-                                    txEntry = addEntry(READ, old, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L,
-                                        null);
-
-                                    txEntry.markValid();
-                                }
-
-                                if (readCommitted() || old == null)
-                                    cacheCtx.evicts().touch(entry, topologyVersion());
-
-                                break; // While.
-                            }
-
-                            txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
-                                old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
-                                drExpireTime, drVer);
-
-                            if (!implicit() && readCommitted())
-                                cacheCtx.evicts().touch(entry, topologyVersion());
-
-                            if (groupLock() && !lockOnly)
-                                txEntry.groupLockEntry(true);
-
-                            enlisted.add(key);
-
-                            if (!pessimistic() || (groupLock() && !lockOnly)) {
-                                txEntry.markValid();
-
-                                if (old == null) {
-                                    if (retval && !readThrough) {
-                                        // If return value is required, then we know for sure that there is only
-                                        // one key in the keys collection.
-                                        assert keys.size() == 1;
-
-                                        IgniteFuture<Boolean> fut = loadMissing(
-                                            cacheCtx,
-                                            true,
-                                            F.asList(key),
-                                            deserializePortables(cacheCtx),
-                                            new CI2<K, V>() {
-                                                @Override public void apply(K k, V v) {
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Loaded value from remote node [key=" + k + ", val=" +
-                                                            v + ']');
-
-                                                    ret.set(v, true);
-                                                }
-                                            });
-
-                                        return new GridEmbeddedFuture<>(
-                                            cctx.kernalContext(),
-                                            fut,
-                                            new C2<Boolean, Exception, Set<K>>() {
-                                                @Override public Set<K> apply(Boolean b, Exception e) {
-                                                    if (e != null)
-                                                        throw new GridClosureException(e);
-
-                                                    return Collections.emptySet();
-                                                }
-                                            }
-                                        );
-                                    }
-                                    else
-                                        ret.set(null, true);
-                                }
-                                else
-                                    ret.set(old, true);
-                            }
-                            // Pessimistic.
-                            else
-                                ret.set(old, true);
-
-                            break; // While.
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            if (log.isDebugEnabled())
-                                log.debug("Got removed entry in transaction putAll0 method: " + entry);
-                        }
-                    }
-                }
-                else {
-                    if (transformClo == null && txEntry.op() == TRANSFORM)
-                        throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
-                            "transaction after transform closure is applied): " + key);
-
-                    GridCacheEntryEx<K, V> entry = txEntry.cached();
-
-                    V v = txEntry.value();
-
-                    boolean del = txEntry.op() == DELETE && rmv;
-
-                    if (!del) {
-                        if (!filter(entry, filter)) {
-                            skipped = skip(skipped, key);
-
-                            ret.set(v, false);
-
-                            continue;
-                        }
-
-                        txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
-                            v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
-                            drExpireTime, drVer);
-
-                        enlisted.add(key);
-                    }
-
-                    if (!pessimistic()) {
-                        txEntry.markValid();
-
-                        // Set tx entry and return values.
-                        ret.set(v, true);
-                    }
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(cctx.kernalContext(), e);
-        }
-
-        return new GridFinishedFuture<>(cctx.kernalContext(), skipped);
-    }
-
-    /**
-     * Post lock processing for put or remove.
-     *
-     * @param keys Keys.
-     * @param failed Collection of potentially failed keys (need to populate in this method).
-     * @param transformed Output map where transformed values will be placed.
-     * @param transformMap Transform map.
-     * @param ret Return value.
-     * @param rmv {@code True} if remove.
-     * @param retval Flag to return value or not.
-     * @param filter Filter to check entries.
-     * @return Failed keys.
-     * @throws IgniteCheckedException If error.
-     */
-    protected Set<K> postLockWrite(
-        GridCacheContext<K, V> cacheCtx,
-        Iterable<? extends K> keys,
-        Set<K> failed,
-        @Nullable Map<K, V> transformed,
-        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
-        GridCacheReturn<V> ret,
-        boolean rmv,
-        boolean retval,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter
-    ) throws IgniteCheckedException {
-        for (K k : keys) {
-            GridCacheTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k));
-
-            if (txEntry == null)
-                throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
-                    "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
-
-            while (true) {
-                GridCacheEntryEx<K, V> cached = txEntry.cached();
-
-                try {
-                    assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
-                        "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
-                            ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
-
-                    if (log.isDebugEnabled())
-                        log.debug("Post lock write entry: " + cached);
-
-                    V v = txEntry.previousValue();
-                    boolean hasPrevVal = txEntry.hasPreviousValue();
-
-                    if (onePhaseCommit())
-                        filter = txEntry.filters();
-
-                    // If we have user-passed filter, we must read value into entry for peek().
-                    if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
-                        retval = true;
-
-                    if (retval) {
-                        if (!cacheCtx.isNear()) {
-                            try {
-                                if (!hasPrevVal)
-                                    v = cached.innerGet(this,
-                                        /*swap*/retval,
-                                        /*read-through*/retval,
-                                        /*failFast*/false,
-                                        /*unmarshal*/retval,
-                                        /*metrics*/true,
-                                        /*event*/!dht(),
-                                        /*temporary*/false,
-                                        CU.subjectId(this, cctx),
-                                        null,
-                                        resolveTaskName(),
-                                        CU.<K, V>empty());
-                            }
-                            catch (GridCacheFilterFailedException e) {
-                                e.printStackTrace();
-
-                                assert false : "Empty filter failed: " + e;
-                            }
-                        }
-                        else {
-                            if (!hasPrevVal)
-                                v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet();
-                        }
-
-                        ret.value(v);
-                    }
-
-                    boolean pass = cacheCtx.isAll(cached, filter);
-
-                    // For remove operation we return true only if we are removing s/t,
-                    // i.e. cached value is not null.
-                    ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
-
-                    if (onePhaseCommit())
-                        txEntry.filtersPassed(pass);
-
-                    if (pass) {
-                        txEntry.markValid();
-
-                        if (log.isDebugEnabled())
-                            log.debug("Filter passed in post lock for key: " + k);
-                    }
-                    else {
-                        failed = skip(failed, k);
-
-                        // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
-                        txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value());
-                        txEntry.filters(CU.<K, V>empty());
-                        txEntry.filtersSet(false);
-                    }
-
-                    break; // While.
-                }
-                // If entry cached within transaction got removed before lock.
-                catch (GridCacheEntryRemovedException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
-
-                    txEntry.cached(entryEx(cached.context(), txEntry.txKey()), txEntry.keyBytes());
-                }
-            }
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Entries that failed after lock filter check: " + failed);
-
-        return failed;
-    }
-
-    /**
-     * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
-     * maps must be non-null.
-     *
-     * @param map Key-value map to store.
-     * @param transformMap Transform map.
-     * @param drMap DR map.
-     * @param retval Key-transform value map to store.
-     * @param cached Cached entry, if any.
-     * @param ttl Time to live.
-     * @param filter Filter.
-     * @return Operation future.
-     */
-    private IgniteFuture<GridCacheReturn<V>> putAllAsync0(
-        final GridCacheCon

<TRUNCATED>