You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 15:26:48 UTC

[41/49] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 0834e88..fcbf58d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -574,19 +574,42 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         // Nullify explicit version so that innerSet/innerRemove will work as usual.
                                         explicitVer = null;
 
+                                    GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
+
                                     if (op == CREATE || op == UPDATE) {
                                         // Invalidate only for near nodes (backups cannot be invalidated).
                                         if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
-                                            cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
-                                                topVer, null, replicate ? DR_BACKUP : DR_NONE,
+                                            cached.innerRemove(this,
+                                                eventNodeId(),
+                                                nodeId,
+                                                false,
+                                                false,
+                                                true,
+                                                true,
+                                                topVer,
+                                                null,
+                                                replicate ? DR_BACKUP : DR_NONE,
                                                 near() ? null : explicitVer, CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                dhtVer);
                                         else {
-                                            cached.innerSet(this, eventNodeId(), nodeId, val, false, false,
-                                                txEntry.ttl(), true, true, topVer, null,
-                                                replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
-                                                near() ? null : explicitVer, CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                            cached.innerSet(this,
+                                                eventNodeId(),
+                                                nodeId,
+                                                val,
+                                                false,
+                                                false,
+                                                txEntry.ttl(),
+                                                true,
+                                                true,
+                                                topVer,
+                                                null,
+                                                replicate ? DR_BACKUP : DR_NONE,
+                                                txEntry.conflictExpireTime(),
+                                                near() ? null : explicitVer,
+                                                CU.subjectId(this, cctx),
+                                                resolveTaskName(),
+                                                dhtVer);
 
                                             // Keep near entry up to date.
                                             if (nearCached != null) {
@@ -602,9 +625,20 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         }
                                     }
                                     else if (op == DELETE) {
-                                        cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
-                                            topVer, null, replicate ? DR_BACKUP : DR_NONE,
-                                            near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName());
+                                        cached.innerRemove(this,
+                                            eventNodeId(),
+                                            nodeId,
+                                            false,
+                                            false,
+                                            true,
+                                            true,
+                                            topVer,
+                                            null,
+                                            replicate ? DR_BACKUP : DR_NONE,
+                                            near() ? null : explicitVer,
+                                            CU.subjectId(this, cctx),
+                                            resolveTaskName(),
+                                            dhtVer);
 
                                         // Keep near entry up to date.
                                         if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
new file mode 100644
index 0000000..721ba4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
+/**
+ *
+ */
+public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
+    implements GridCacheFuture<Map<K, V>> {
+    /** Default max remap count value. */
+    public static final int DFLT_MAX_REMAP_CNT = 3;
+
+    /** Maximum number of attempts to remap key to the same primary node. */
+    protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+
+    /** Context. */
+    protected final GridCacheContext<K, V> cctx;
+
+    /** Keys. */
+    protected Collection<KeyCacheObject> keys;
+
+    /** Read through flag. */
+    protected boolean readThrough;
+
+    /** Force primary flag. */
+    protected boolean forcePrimary;
+
+    /** Future ID. */
+    protected IgniteUuid futId;
+
+    /** Trackable flag. */
+    protected boolean trackable;
+
+    /** Remap count. */
+    protected AtomicInteger remapCnt = new AtomicInteger();
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name. */
+    protected String taskName;
+
+    /** Whether to deserialize portable objects. */
+    protected boolean deserializePortable;
+
+    /** Skip values flag. */
+    protected boolean skipVals;
+
+    /** Expiry policy. */
+    protected IgniteCacheExpiryPolicy expiryPlc;
+
+    /** Flag indicating that get should be done on a locked topology version. */
+    protected final boolean canRemap;
+
+    /** */
+    protected final boolean needVer;
+
+    /** */
+    protected final boolean keepCacheObjects;
+
+    /**
+     * @param cctx Context.
+     * @param keys Keys.
+     * @param readThrough Read through flag.
+     * @param forcePrimary If {@code true} then will force network trip to primary node even
+     *          if called on backup node.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObjects Keep cache objects flag.
+     */
+    protected CacheDistributedGetFutureAdapter(
+        GridCacheContext<K, V> cctx,
+        Collection<KeyCacheObject> keys,
+        boolean readThrough,
+        boolean forcePrimary,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObjects
+    ) {
+        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+
+        assert !F.isEmpty(keys);
+
+        this.cctx = cctx;
+        this.keys = keys;
+        this.readThrough = readThrough;
+        this.forcePrimary = forcePrimary;
+        this.subjId = subjId;
+        this.taskName = taskName;
+        this.deserializePortable = deserializePortable;
+        this.expiryPlc = expiryPlc;
+        this.skipVals = skipVals;
+        this.canRemap = canRemap;
+        this.needVer = needVer;
+        this.keepCacheObjects = keepCacheObjects;
+
+        futId = IgniteUuid.randomUuid();
+    }
+
+    /**
+     * @param map Result map.
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     */
+    @SuppressWarnings("unchecked")
+    protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) {
+        assert needVer;
+        assert skipVals || val != null;
+        assert ver != null;
+
+        map.put(key, new T2<>(skipVals ? true : val, ver));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 9d02705..bdd1140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CI3;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -562,7 +563,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /**
      * This method is used internally. Use
-     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean)}
+     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean)}
      * method instead to retrieve DHT value.
      *
      * @param keys {@inheritDoc}
@@ -574,7 +575,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable Collection<? extends K> keys,
         boolean forcePrimary,
         boolean skipTx,
-        @Nullable GridCacheEntryEx entry,
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
@@ -585,7 +585,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
         return getAllAsync(keys,
             opCtx == null || !opCtx.skipStore(),
-            null,
             /*don't check local tx. */false,
             subjId,
             taskName,
@@ -603,9 +602,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param taskName Task name.
      * @param expiry Expiry policy.
      * @param skipVals Skip values flag.
+     * @param canRemap Can remap flag.
      * @return Get future.
      */
-    IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync(
+    IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync(
         Collection<KeyCacheObject> keys,
         boolean readThrough,
         @Nullable UUID subjId,
@@ -623,7 +623,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             expiry,
             skipVals,
             /*keep cache objects*/true,
-            canRemap);
+            canRemap,
+            /*need version*/true);
     }
 
     /**
@@ -631,18 +632,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param msgId Message ID.
      * @param keys Keys to get.
      * @param readThrough Read through flag.
-     * @param reload Reload flag.
      * @param topVer Topology version.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
      * @param expiry Expiry policy.
+     * @param skipVals Skip values flag.
      * @return DHT future.
      */
     public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader,
         long msgId,
         LinkedHashMap<KeyCacheObject, Boolean> keys,
         boolean readThrough,
-        boolean reload,
         AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
         int taskNameHash,
@@ -653,7 +653,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             reader,
             keys,
             readThrough,
-            reload,
             /*tx*/null,
             topVer,
             subjId,
@@ -672,6 +671,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      */
     protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
         assert ctx.affinityNode();
+        assert !req.reload() : req;
 
         long ttl = req.accessTtl();
 
@@ -682,7 +682,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 req.messageId(),
                 req.keys(),
                 req.readThrough(),
-                req.reload(),
                 req.topologyVersion(),
                 req.subjectId(),
                 req.taskNameHash(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..1b2d834 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -163,6 +163,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
      * @param topVer Topology version.
      * @param threadId Owning thread ID.
      * @param ver Lock version.
+     * @param serOrder Version for serializable transactions ordering.
+     * @param serReadVer Optional read entry version for optimistic serializable transaction.
      * @param timeout Timeout to acquire lock.
      * @param reenter Reentry flag.
      * @param tx Tx flag.
@@ -177,10 +179,17 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         AffinityTopologyVersion topVer,
         long threadId,
         GridCacheVersion ver,
+        @Nullable GridCacheVersion serOrder,
+        @Nullable GridCacheVersion serReadVer,
         long timeout,
         boolean reenter,
         boolean tx,
-        boolean implicitSingle) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
+        boolean implicitSingle)
+        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException
+    {
+        assert serReadVer == null || serOrder != null;
+        assert !reenter || serOrder == null;
+
         GridCacheMvccCandidate cand;
         GridCacheMvccCandidate prev;
         GridCacheMvccCandidate owner;
@@ -213,6 +222,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                 threadId,
                 ver,
                 timeout,
+                serOrder,
                 reenter,
                 tx,
                 implicitSingle,
@@ -235,12 +245,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
 
             val = this.val;
 
-            if (mvcc != null && mvcc.isEmpty())
+            if (mvcc.isEmpty())
                 mvccExtras(null);
         }
 
         // Don't link reentries.
-        if (cand != null && !cand.reentry())
+        if (!cand.reentry())
             // Link with other candidates in the same thread.
             cctx.mvcc().addNext(cctx, cand);
 
@@ -250,7 +260,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean tmLock(IgniteInternalTx tx, long timeout)
+    @Override public boolean tmLock(IgniteInternalTx tx,
+        long timeout,
+        @Nullable GridCacheVersion serOrder,
+        GridCacheVersion serReadVer)
         throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
         if (tx.local()) {
             GridDhtTxLocalAdapter dhtTx = (GridDhtTxLocalAdapter)tx;
@@ -262,6 +275,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                 tx.topologyVersion(),
                 tx.threadId(),
                 tx.xidVersion(),
+                serOrder,
+                serReadVer,
                 timeout,
                 /*reenter*/false,
                 /*tx*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index a67b1de..7108da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -17,16 +17,16 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
@@ -72,9 +73,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     /** */
     private UUID reader;
 
-    /** Reload flag. */
-    private boolean reload;
-
     /** Read through flag. */
     private boolean readThrough;
 
@@ -120,7 +118,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
      * @param reader Reader.
      * @param keys Keys.
      * @param readThrough Read through flag.
-     * @param reload Reload flag.
      * @param tx Transaction.
      * @param topVer Topology version.
      * @param subjId Subject ID.
@@ -134,7 +131,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         UUID reader,
         LinkedHashMap<KeyCacheObject, Boolean> keys,
         boolean readThrough,
-        boolean reload,
         @Nullable IgniteTxLocalEx tx,
         @NotNull AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
@@ -152,7 +148,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         this.msgId = msgId;
         this.keys = keys;
         this.readThrough = readThrough;
-        this.reload = reload;
         this.tx = tx;
         this.topVer = topVer;
         this.subjId = subjId;
@@ -291,8 +286,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             return new GridFinishedFuture<Collection<GridCacheEntryInfo>>(
                 Collections.<GridCacheEntryInfo>emptyList());
 
-        final Collection<GridCacheEntryInfo> infos = new LinkedList<>();
-
         String taskName0 = cctx.kernalContext().job().currentTaskName();
 
         if (taskName0 == null)
@@ -302,89 +295,77 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
 
         GridCompoundFuture<Boolean, Boolean> txFut = null;
 
-        for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
-            while (true) {
-                GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
+        ClusterNode readerNode = cctx.discovery().node(reader);
 
-                try {
-                    GridCacheEntryInfo info = e.info();
+        if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+            for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
+                while (true) {
+                    GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
 
-                    // If entry is obsolete.
-                    if (info == null)
-                        continue;
+                    try {
+                        if (e.obsolete())
+                            continue;
 
-                    boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
+                        boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
 
-                    if (addReader)
-                        e.unswap(false);
+                        if (addReader)
+                            e.unswap(false);
 
-                    // Register reader. If there are active transactions for this entry,
-                    // then will wait for their completion before proceeding.
-                    // TODO: GG-4003:
-                    // TODO: What if any transaction we wait for actually removes this entry?
-                    // TODO: In this case seems like we will be stuck with untracked near entry.
-                    // TODO: To fix, check that reader is contained in the list of readers once
-                    // TODO: again after the returned future completes - if not, try again.
-                    // TODO: Also, why is info read before transactions are complete, and not after?
-                    IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
+                        // Register reader. If there are active transactions for this entry,
+                        // then will wait for their completion before proceeding.
+                        // TODO: GG-4003:
+                        // TODO: What if any transaction we wait for actually removes this entry?
+                        // TODO: In this case seems like we will be stuck with untracked near entry.
+                        // TODO: To fix, check that reader is contained in the list of readers once
+                        // TODO: again after the returned future completes - if not, try again.
+                        IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
 
-                    if (f != null) {
-                        if (txFut == null)
-                            txFut = new GridCompoundFuture<>(CU.boolReducer());
-
-                        txFut.add(f);
-                    }
+                        if (f != null) {
+                            if (txFut == null)
+                                txFut = new GridCompoundFuture<>(CU.boolReducer());
 
-                    infos.add(info);
+                            txFut.add(f);
+                        }
 
-                    break;
-                }
-                catch (IgniteCheckedException err) {
-                    return new GridFinishedFuture<>(err);
-                }
-                catch (GridCacheEntryRemovedException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry when getting a DHT value: " + e);
-                }
-                finally {
-                    cctx.evicts().touch(e, topVer);
+                        break;
+                    }
+                    catch (IgniteCheckedException err) {
+                        return new GridFinishedFuture<>(err);
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry when getting a DHT value: " + e);
+                    }
+                    finally {
+                        cctx.evicts().touch(e, topVer);
+                    }
                 }
             }
-        }
 
-        if (txFut != null)
-            txFut.markInitialized();
+            if (txFut != null)
+                txFut.markInitialized();
+        }
 
-        IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut;
+        IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
 
         if (txFut == null || txFut.isDone()) {
-            if (reload && cctx.readThrough() && cctx.store().configured()) {
-                fut = cache().reloadAllAsync0(keys.keySet(),
-                    true,
-                    skipVals,
+            if (tx == null) {
+                fut = cache().getDhtAllAsync(
+                    keys.keySet(),
+                    readThrough,
                     subjId,
-                    taskName);
+                    taskName,
+                    expiryPlc,
+                    skipVals,
+                    /*can remap*/true);
             }
             else {
-                if (tx == null) {
-                    fut = cache().getDhtAllAsync(
-                        keys.keySet(),
-                        readThrough,
-                        subjId,
-                        taskName,
-                        expiryPlc,
-                        skipVals,
-                        /*can remap*/true);
-                }
-                else {
-                    fut = tx.getAllAsync(cctx,
-                        keys.keySet(),
-                        null,
-                        /*deserialize portable*/false,
-                        skipVals,
-                        /*keep cache objects*/true,
-                        /*skip store*/!readThrough);
-                }
+                fut = tx.getAllAsync(cctx,
+                    keys.keySet(),
+                    /*deserialize portable*/false,
+                    skipVals,
+                    /*keep cache objects*/true,
+                    /*skip store*/!readThrough);
             }
         }
         else {
@@ -393,38 +374,28 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             // transactions to complete.
             fut = new GridEmbeddedFuture<>(
                 txFut,
-                new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() {
-                    @Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) {
+                new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() {
+                    @Override public IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> apply(Boolean b, Exception e) {
                         if (e != null)
                             throw new GridClosureException(e);
 
-                        if (reload && cctx.readThrough() && cctx.store().configured()) {
-                            return cache().reloadAllAsync0(keys.keySet(),
-                                true,
-                                skipVals,
+                        if (tx == null) {
+                            return cache().getDhtAllAsync(
+                                keys.keySet(),
+                                readThrough,
                                 subjId,
-                                taskName);
+                                taskName,
+                                expiryPlc,
+                                skipVals,
+                                /*can remap*/true);
                         }
                         else {
-                            if (tx == null) {
-                                return cache().getDhtAllAsync(
-                                    keys.keySet(),
-                                    readThrough,
-                                    subjId,
-                                    taskName,
-                                    expiryPlc,
-                                    skipVals,
-                                    /*can remap*/true);
-                            }
-                            else {
-                                return tx.getAllAsync(cctx,
-                                    keys.keySet(),
-                                    null,
-                                    /*deserialize portable*/false,
-                                    skipVals,
-                                    /*keep cache objects*/true,
-                                    /*skip store*/!readThrough);
-                            }
+                            return tx.getAllAsync(cctx,
+                                keys.keySet(),
+                                /*deserialize portable*/false,
+                                skipVals,
+                                /*keep cache objects*/true,
+                                /*skip store*/!readThrough);
                         }
                     }
                 }
@@ -432,23 +403,29 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         }
 
         return new GridEmbeddedFuture<>(
-            new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() {
-                @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) {
+            new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() {
+                @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) {
                     if (e != null) {
                         onDone(e);
 
                         return Collections.emptyList();
                     }
                     else {
-                        for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) {
-                            GridCacheEntryInfo info = it.next();
+                        Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
 
-                            Object v = map.get(info.key());
+                        for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
+                            T2<CacheObject, GridCacheVersion> val = entry.getValue();
 
-                            if (v == null)
-                                it.remove();
-                            else
-                                info.value(skipVals ? null : (CacheObject)v);
+                            assert val != null;
+
+                            GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                            info.cacheId(cctx.cacheId());
+                            info.key(entry.getKey());
+                            info.value(skipVals ? null : val.get1());
+                            info.version(val.get2());
+
+                            infos.add(info);
                         }
 
                         return infos;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 4f3e97d..c175b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -380,9 +380,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      * @return Lock candidate.
      * @throws GridCacheEntryRemovedException If entry was removed.
      * @throws GridDistributedLockCancelledException If lock is canceled.
+     * @throws IgniteCheckedException If failed.
      */
     @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry)
-        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
+        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Adding entry: " + entry);
 
@@ -400,6 +401,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             topVer,
             threadId,
             lockVer,
+            null,
+            null,
             timeout,
             /*reenter*/false,
             inTx(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c09a611..4ce4759 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -672,9 +672,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry when adding lock (will retry): " + entry);
                     }
-                    catch (GridDistributedLockCancelledException e) {
+                    catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
                         if (log.isDebugEnabled())
-                            log.debug("Got lock request for cancelled lock (will fail): " + entry);
+                            log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
 
                         return new GridDhtFinishedFuture<>(e);
                     }
@@ -1106,62 +1106,55 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             if (tx == null || !tx.isRollbackOnly()) {
                                 GridCacheVersion dhtVer = req.dhtVersion(i);
 
-                                try {
-                                    GridCacheVersion ver = e.version();
-
-                                    boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
-
-                                    CacheObject val = null;
-
-                                    if (ret)
-                                        val = e.innerGet(tx,
-                                            /*swap*/true,
-                                            /*read-through*/false,
-                                            /*fail-fast.*/false,
-                                            /*unmarshal*/false,
-                                            /*update-metrics*/true,
-                                            /*event notification*/req.returnValue(i),
-                                            /*temporary*/false,
-                                            CU.subjectId(tx, ctx.shared()),
-                                            null,
-                                            tx != null ? tx.resolveTaskName() : null,
-                                            null);
-
-                                    assert e.lockedBy(mappedVer) ||
-                                        (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :
-                                        "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() +
-                                            ", entry=" + e +
-                                            ", mappedVer=" + mappedVer + ", ver=" + ver +
-                                            ", tx=" + tx + ", req=" + req +
-                                            ", err=" + err + ']';
-
-                                    boolean filterPassed = false;
-
-                                    if (tx != null && tx.onePhaseCommit()) {
-                                        IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key()));
-
-                                        assert writeEntry != null :
-                                            "Missing tx entry for locked cache entry: " + e;
-
-                                        filterPassed = writeEntry.filtersPassed();
-                                    }
-
-                                    if (ret && val == null)
-                                        val = e.valueBytes(null);
-
-                                    // We include values into response since they are required for local
-                                    // calls and won't be serialized. We are also including DHT version.
-                                    res.addValueBytes(
-                                        ret ? val : null,
-                                        filterPassed,
-                                        ver,
-                                        mappedVer);
-                                }
-                                catch (GridCacheFilterFailedException ex) {
-                                    assert false : "Filter should never fail if fail-fast is false.";
+                                GridCacheVersion ver = e.version();
+
+                                boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
+
+                                CacheObject val = null;
+
+                                if (ret)
+                                    val = e.innerGet(tx,
+                                        /*swap*/true,
+                                        /*read-through*/false,
+                                        /*fail-fast.*/false,
+                                        /*unmarshal*/false,
+                                        /*update-metrics*/true,
+                                        /*event notification*/req.returnValue(i),
+                                        /*temporary*/false,
+                                        CU.subjectId(tx, ctx.shared()),
+                                        null,
+                                        tx != null ? tx.resolveTaskName() : null,
+                                        null);
+
+                                assert e.lockedBy(mappedVer) ||
+                                    (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :
+                                    "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() +
+                                        ", entry=" + e +
+                                        ", mappedVer=" + mappedVer + ", ver=" + ver +
+                                        ", tx=" + tx + ", req=" + req +
+                                        ", err=" + err + ']';
+
+                                boolean filterPassed = false;
 
-                                    ex.printStackTrace();
+                                if (tx != null && tx.onePhaseCommit()) {
+                                    IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key()));
+
+                                    assert writeEntry != null :
+                                        "Missing tx entry for locked cache entry: " + e;
+
+                                    filterPassed = writeEntry.filtersPassed();
                                 }
+
+                                if (ret && val == null)
+                                    val = e.valueBytes(null);
+
+                                // We include values into response since they are required for local
+                                // calls and won't be serialized. We are also including DHT version.
+                                res.addValueBytes(
+                                    ret ? val : null,
+                                    filterPassed,
+                                    ver,
+                                    mappedVer);
                             }
                             else {
                                 // We include values into response since they are required for local

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 4f8469f..44f34aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -595,7 +595,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 if (finish(false) || state() == UNKNOWN)
                     fut.finish();
                 else
-                    fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this)));
+                    fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(this)));
             }
             catch (IgniteTxOptimisticCheckedException e) {
                 if (log.isDebugEnabled())
@@ -627,7 +627,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                         if (finish(false) || state() == UNKNOWN)
                             fut.finish();
                         else
-                            fut.onError(new IgniteCheckedException("Failed to commit transaction: " +
+                            fut.onError(new IgniteCheckedException("Failed to rollback transaction: " +
                                 CU.txString(GridDhtTxLocal.this)));
 
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a15a334..d806801 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -57,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridLeanSet;
@@ -429,9 +429,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             catch (GridCacheEntryRemovedException e) {
                 assert false : "Got entry removed exception while holding transactional lock on entry: " + e;
             }
-            catch (GridCacheFilterFailedException e) {
-                assert false : "Got filter failed exception with fail fast false " + e;
-            }
         }
     }
 
@@ -472,8 +469,18 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (log.isDebugEnabled())
             log.debug("Marking all local candidates as ready: " + this);
 
-        Iterable<IgniteTxEntry> checkEntries = writes;
+        readyLocks(writes);
+
+        if (tx.serializable() && tx.optimistic())
+            readyLocks(reads);
 
+        locksReady = true;
+    }
+
+    /**
+     * @param checkEntries Entries.
+     */
+    private void readyLocks(Iterable<IgniteTxEntry> checkEntries) {
         for (IgniteTxEntry txEntry : checkEntries) {
             GridCacheContext cacheCtx = txEntry.context();
 
@@ -513,8 +520,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
             }
         }
-
-        locksReady = true;
     }
 
     /**
@@ -813,12 +818,19 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         this.writes = writes;
         this.txNodes = txNodes;
 
-        if (!F.isEmpty(writes)) {
+        boolean ser = tx.serializable() && tx.optimistic();
+
+        if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) {
             Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
 
             for (IgniteTxEntry entry : writes)
                 forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
 
+            if (ser) {
+                for (IgniteTxEntry entry : reads)
+                    forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+            }
+
             forceKeysFut = forceRebalanceKeys(forceKeys);
         }
 
@@ -847,7 +859,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         IgniteTxEntry e,
         Map<Integer, Collection<KeyCacheObject>> map
     ) {
-        if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) {
+        if (retVal ||
+            !F.isEmpty(e.entryProcessors()) ||
+            !F.isEmpty(e.filters()) ||
+            e.serializableReadVersion() != null) {
             if (map == null)
                 map = new HashMap<>();
 
@@ -906,10 +921,86 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
+     * @param entries Entries.
+     * @return Not null exception if version check failed.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private IgniteCheckedException checkReadConflict(Iterable<IgniteTxEntry> entries)
+        throws IgniteCheckedException {
+        try {
+            for (IgniteTxEntry entry : entries) {
+                GridCacheVersion serReadVer = entry.serializableReadVersion();
+
+                if (serReadVer != null) {
+                    entry.cached().unswap();
+
+                    if (!entry.cached().checkSerializableReadVersion(serReadVer))
+                        return versionCheckError(entry);
+                }
+            }
+        }
+        catch (GridCacheEntryRemovedException e) {
+            assert false : "Got removed exception on entry with dht local candidate: " + entries;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param entry Entry.
+     * @return Optimistic version check error.
+     */
+    private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry) {
+        GridCacheContext cctx = entry.context();
+
+        return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " +
+            "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) +
+            ", cache=" + cctx.name() + ']');
+    }
+
+    /**
      *
      */
     private void prepare0() {
         try {
+            if (tx.serializable() && tx.optimistic()) {
+                IgniteCheckedException err0;
+
+                try {
+                    err0 = checkReadConflict(writes);
+
+                    if (err0 == null)
+                        err0 = checkReadConflict(reads);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to check entry version: " + e, e);
+
+                    err0 = e;
+                }
+
+                if (err0 != null) {
+                    err.compareAndSet(null, err0);
+
+                    final GridNearTxPrepareResponse res = createPrepareResponse();
+
+                    tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                        @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                            if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) {
+                                try {
+                                    if (replied.compareAndSet(false, true))
+                                        sendPrepareResponse(res);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+                                }
+                            }
+                        }
+                    });
+
+                    return;
+                }
+            }
+
             // We are holding transaction-level locks for entries here, so we can get next write version.
             onEntriesLocked();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index a68e834..febe9ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -24,11 +24,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -39,8 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -58,6 +53,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -65,83 +61,30 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-
 /**
  * Colocated get future.
  */
-public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>> {
+public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Default max remap count value. */
-    public static final int DFLT_MAX_REMAP_CNT = 3;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
     /** Logger. */
     private static IgniteLogger log;
 
-    /** Maximum number of attempts to remap key to the same primary node. */
-    private static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger(IGNITE_NEAR_GET_MAX_REMAPS,
-        DFLT_MAX_REMAP_CNT);
-
-    /** Context. */
-    private final GridCacheContext<K, V> cctx;
-
-    /** Keys. */
-    private Collection<KeyCacheObject> keys;
-
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
-    /** Reload flag. */
-    private boolean reload;
-
-    /** Read-through flag. */
-    private boolean readThrough;
-
-    /** Force primary flag. */
-    private boolean forcePrimary;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
     /** Version. */
     private GridCacheVersion ver;
 
-    /** Trackable flag. */
-    private volatile boolean trackable;
-
-    /** Remap count. */
-    private AtomicInteger remapCnt = new AtomicInteger();
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name. */
-    private String taskName;
-
-    /** Whether to deserialize portable objects. */
-    private boolean deserializePortable;
-
-    /** Expiry policy. */
-    private IgniteCacheExpiryPolicy expiryPlc;
-
-    /** Skip values flag. */
-    private boolean skipVals;
-
-    /** Flag indicating whether future can be remapped on a newer topology version. */
-    private final boolean canRemap;
-
     /**
      * @param cctx Context.
      * @param keys Keys.
      * @param topVer Topology version.
      * @param readThrough Read through flag.
-     * @param reload Reload flag.
      * @param forcePrimary If {@code true} then will force network trip to primary node even
      *          if called on backup node.
      * @param subjId Subject ID.
@@ -149,39 +92,39 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObjects Keep cache objects flag.
      */
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
         Collection<KeyCacheObject> keys,
         AffinityTopologyVersion topVer,
         boolean readThrough,
-        boolean reload,
         boolean forcePrimary,
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean canRemap
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObjects
     ) {
-        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
-
-        assert !F.isEmpty(keys);
+        super(cctx,
+            keys,
+            readThrough,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiryPlc,
+            skipVals,
+            canRemap,
+            needVer,
+            keepCacheObjects);
 
-        this.cctx = cctx;
-        this.keys = keys;
         this.topVer = topVer;
-        this.readThrough = readThrough;
-        this.reload = reload;
-        this.forcePrimary = forcePrimary;
-        this.subjId = subjId;
-        this.deserializePortable = deserializePortable;
-        this.taskName = taskName;
-        this.expiryPlc = expiryPlc;
-        this.skipVals = skipVals;
-        this.canRemap = canRemap;
-
-        futId = IgniteUuid.randomUuid();
 
         ver = cctx.versions().next();
 
@@ -351,7 +294,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                         -1,
                         mappedKeys,
                         readThrough,
-                        reload,
                         topVer,
                         subjId,
                         taskName == null ? 0 : taskName.hashCode(),
@@ -404,7 +346,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     ver,
                     mappedKeys,
                     readThrough,
-                    reload,
                     topVer,
                     subjId,
                     taskName == null ? 0 : taskName.hashCode(),
@@ -452,10 +393,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
 
         while (true) {
-            GridCacheEntryEx entry = null;
+            GridCacheEntryEx entry;
 
             try {
-                if (!reload && allowLocRead) {
+                if (allowLocRead) {
                     try {
                         entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
                             colocated.peekEx(key);
@@ -464,18 +405,40 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                         if (entry != null) {
                             boolean isNew = entry.isNewLocked();
 
-                            CacheObject v = entry.innerGet(null,
-                                /*swap*/true,
-                                /*read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                /*temporary*/false,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc);
+                            CacheObject v = null;
+                            GridCacheVersion ver = null;
+
+                            if (needVer) {
+                                T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                    null,
+                                    /*swap*/true,
+                                    /*unmarshal*/true,
+                                    /**update-metrics*/false,
+                                    /*event*/!skipVals,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiryPlc);
+
+                                if (res != null) {
+                                    v = res.get1();
+                                    ver = res.get2();
+                                }
+                            }
+                            else {
+                                v = entry.innerGet(null,
+                                    /*swap*/true,
+                                    /*read-through*/false,
+                                    /*fail-fast*/true,
+                                    /*unmarshal*/true,
+                                    /**update-metrics*/false,
+                                    /*event*/!skipVals,
+                                    /*temporary*/false,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiryPlc);
+                            }
 
                             colocated.context().evicts().touch(entry, topVer);
 
@@ -485,7 +448,16 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                                     colocated.removeIfObsolete(key);
                             }
                             else {
-                                cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
+                                if (needVer)
+                                    versionedResult(locVals, key, v, ver);
+                                else
+                                    cctx.addResult(locVals,
+                                        key,
+                                        v,
+                                        skipVals,
+                                        keepCacheObjects,
+                                        deserializePortable,
+                                        true);
 
                                 return false;
                             }
@@ -536,14 +508,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             catch (GridCacheEntryRemovedException ignored) {
                 // No-op, will retry.
             }
-            catch (GridCacheFilterFailedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Filter validation failed for entry: " + e);
-
-                colocated.context().evicts().touch(entry, topVer);
-
-                break;
-            }
         }
 
         return remote;
@@ -591,7 +555,16 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             for (GridCacheEntryInfo info : infos) {
                 assert skipVals == (info.value() == null);
 
-                cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false);
+                if (needVer)
+                    versionedResult(map, info.key(), info.value(), info.version());
+                else
+                    cctx.addResult(map,
+                        info.key(),
+                        info.value(),
+                        skipVals,
+                        keepCacheObjects,
+                        deserializePortable,
+                        false);
             }
 
             return map;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cba6872..4cd9e84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -306,7 +306,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable final Collection<? extends K> keys,
         final boolean forcePrimary,
         boolean skipTx,
-        @Nullable final GridCacheEntryEx entry,
         @Nullable UUID subjId,
         final String taskName,
         final boolean deserializePortable,
@@ -334,7 +333,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
             @Override public IgniteInternalFuture<Map<K, V>> apply() {
                 return getAllAsync0(ctx.cacheKeysView(keys),
-                    false,
                     forcePrimary,
                     subjId0,
                     taskName,
@@ -920,7 +918,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * Entry point to all public API get methods.
      *
      * @param keys Keys to remove.
-     * @param reload Reload flag.
      * @param forcePrimary Force primary flag.
      * @param subjId Subject ID.
      * @param taskName Task name.
@@ -931,7 +928,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @return Get future.
      */
     private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys,
-        boolean reload,
         boolean forcePrimary,
         UUID subjId,
         String taskName,
@@ -947,7 +943,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
-        if (!reload && !forcePrimary) {
+        if (!forcePrimary) {
             Map<K, V> locVals = U.newHashMap(keys.size());
 
             boolean success = true;
@@ -997,10 +993,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     catch (GridCacheEntryRemovedException ignored) {
                         // No-op, retry.
                     }
-                    catch (GridCacheFilterFailedException ignored) {
-                        // No-op, skip the key.
-                        break;
-                    }
                     catch (GridDhtInvalidPartitionException ignored) {
                         success = false;
 
@@ -1036,14 +1028,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             keys,
             topVer,
             !skipStore,
-            reload,
             forcePrimary,
             subjId,
             taskName,
             deserializePortable,
             expiry,
             skipVals,
-            canRemap);
+            canRemap,
+            false,
+            false);
 
         fut.init();
 
@@ -1663,6 +1656,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     if (idx != null) {
                         GridDhtCacheEntry entry = entries.get(idx);
+
                         try {
                             GridCacheVersion ver = entry.version();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 6d69198..f03b461 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
@@ -68,6 +67,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -188,7 +188,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable final Collection<? extends K> keys,
         boolean forcePrimary,
         boolean skipTx,
-        @Nullable final GridCacheEntryEx entry,
         @Nullable UUID subjId,
         String taskName,
         final boolean deserializePortable,
@@ -212,7 +211,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                 @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
                     return tx.getAllAsync(ctx,
                         ctx.cacheKeysView(keys),
-                        entry,
                         deserializePortable,
                         skipVals,
                         false,
@@ -230,7 +228,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         return loadAsync(
             ctx.cacheKeysView(keys),
             opCtx == null || !opCtx.skipStore(),
-            false,
             forcePrimary,
             topVer,
             subjId,
@@ -257,7 +254,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     /**
      * @param keys Keys to load.
      * @param readThrough Read through flag.
-     * @param reload Reload flag.
      * @param forcePrimary Force get from primary node flag.
      * @param topVer Topology version.
      * @param subjId Subject ID.
@@ -265,12 +261,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
+     * @param canRemap Can remap flag.
      * @return Loaded values.
      */
     public IgniteInternalFuture<Map<K, V>> loadAsync(
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
-        boolean reload,
         boolean forcePrimary,
         AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
@@ -278,7 +274,45 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean canRemap
+        boolean canRemap) {
+        return loadAsync(keys,
+            readThrough,
+            forcePrimary,
+            topVer, subjId,
+            taskName,
+            deserializePortable,
+            expiryPlc,
+            skipVals,
+            canRemap,
+            false,
+            false);
+    }
+
+    /**
+     * @param keys Keys to load.
+     * @param readThrough Read through flag.
+     * @param forcePrimary Force get from primary node flag.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @return Loaded values.
+     */
+    public IgniteInternalFuture<Map<K, V>> loadAsync(
+        @Nullable Collection<KeyCacheObject> keys,
+        boolean readThrough,
+        boolean forcePrimary,
+        AffinityTopologyVersion topVer,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObj
     ) {
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -287,8 +321,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             expiryPlc = expiryPolicy(null);
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
-        if (!reload && !forcePrimary) {
-            Map<K, V> locVals = U.newHashMap(keys.size());
+        if (!forcePrimary) {
+            Map<K, V> locVals = null;
 
             boolean success = true;
 
@@ -304,18 +338,40 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         if (entry != null) {
                             boolean isNew = entry.isNewLocked();
 
-                            CacheObject v = entry.innerGet(null,
-                                /*swap*/true,
-                                /*read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                /*temporary*/false,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc);
+                            CacheObject v = null;
+                            GridCacheVersion ver = null;
+
+                            if (needVer) {
+                                T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                    null,
+                                    /*swap*/true,
+                                    /*unmarshal*/true,
+                                    /**update-metrics*/false,
+                                    /*event*/!skipVals,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiryPlc);
+
+                                if (res != null) {
+                                    v = res.get1();
+                                    ver = res.get2();
+                                }
+                            }
+                            else {
+                                v = entry.innerGet(null,
+                                    /*swap*/true,
+                                    /*read-through*/false,
+                                    /*fail-fast*/true,
+                                    /*unmarshal*/true,
+                                    /**update-metrics*/false,
+                                    /*event*/!skipVals,
+                                    /*temporary*/false,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiryPlc);
+                            }
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null) {
@@ -326,8 +382,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                                 success = false;
                             }
-                            else
-                                ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
+                            else {
+                                if (locVals == null)
+                                    locVals = U.newHashMap(keys.size());
+
+                                if (needVer)
+                                    locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver));
+                                else {
+                                    ctx.addResult(locVals,
+                                        key,
+                                        v,
+                                        skipVals,
+                                        keepCacheObj,
+                                        deserializePortable,
+                                        true);
+                                }
+                            }
                         }
                         else
                             success = false;
@@ -337,10 +407,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                     catch (GridCacheEntryRemovedException ignored) {
                         // No-op, retry.
                     }
-                    catch (GridCacheFilterFailedException ignored) {
-                        // No-op, skip the key.
-                        break;
-                    }
                     catch (GridDhtInvalidPartitionException ignored) {
                         success = false;
 
@@ -377,14 +443,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             keys,
             topVer,
             readThrough,
-            reload,
             forcePrimary,
             subjId,
             taskName,
             deserializePortable,
             expiryPlc,
             skipVals,
-            canRemap);
+            canRemap,
+            needVer,
+            keepCacheObj);
 
         fut.init();
 
@@ -803,10 +870,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry when adding lock (will retry): " + entry);
                     }
-                    catch (GridDistributedLockCancelledException e) {
+                    catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
                         if (log.isDebugEnabled())
-                            log.debug("Got lock request for cancelled lock (will ignore): " +
-                                entry);
+                            log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
 
                         fut.onError(e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 53c2b63..365b46b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -323,7 +323,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     inTx(),
                     inTx() && tx.implicitSingle(),
                     false,
-                    false);
+                    false,
+                    null);
 
                 cand.topologyVersion(topVer.get());
             }
@@ -342,7 +343,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     inTx(),
                     inTx() && tx.implicitSingle(),
                     false,
-                    false);
+                    false,
+                    null);
 
                 cand.topologyVersion(topVer.get());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 82054d9..1bf03a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -385,7 +385,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         @Nullable Collection<? extends K> keys,
         boolean forcePrimary,
         boolean skipTx,
-        @Nullable GridCacheEntryEx entry,
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
@@ -406,7 +405,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         return loadAsync(null,
             ctx.cacheKeysView(keys),
-            false,
             forcePrimary,
             subjId,
             taskName,