You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/07 16:34:18 UTC

[2/2] ignite git commit: ignite-1607 WIP

ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6849ebe1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6849ebe1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6849ebe1

Branch: refs/heads/ignite-1607-read
Commit: 6849ebe10779265cbd22f1afb35bb40c12529881
Parents: 54bbc75
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 7 10:33:34 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 7 17:31:09 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |    5 +-
 .../processors/cache/GridCacheMapEntry.java     |    5 +-
 .../dht/CacheDistributedGetFutureAdapter.java   |  203 +++
 .../distributed/dht/GridDhtCacheEntry.java      |    2 +-
 .../dht/GridPartitionedGetFuture.java           |  106 +-
 .../dht/colocated/GridDhtColocatedCache.java    |    1 +
 .../distributed/near/GridNearGetFuture.java     |  101 +-
 ...arOptimisticSerializableTxPrepareFuture.java |    2 +-
 .../cache/distributed/near/GridNearTxLocal.java |   42 +-
 .../cache/transactions/IgniteTxAdapter.java     |    6 +-
 .../cache/transactions/IgniteTxEntry.java       |    3 +-
 .../transactions/IgniteTxLocalAdapter.java      |  457 ++++---
 .../CacheSerializableTransactionsTest.java      | 1230 +++++++++++++++---
 .../processors/cache/GridCacheTestEntryEx.java  |    4 +-
 14 files changed, 1629 insertions(+), 538 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index bc36d2c..9106b05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -309,6 +309,7 @@ public interface GridCacheEntryEx {
         throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException;
 
     /**
+     * @param tx Cache transaction.
      * @param readSwap Flag indicating whether to check swap memory.
      * @param unmarshal Unmarshal flag.
      * @param updateMetrics If {@code true} then metrics should be updated.
@@ -321,7 +322,9 @@ public interface GridCacheEntryEx {
      * @throws IgniteCheckedException If loading value failed.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap,
+    @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+        IgniteInternalTx tx,
+        boolean readSwap,
         boolean unmarshal,
         boolean updateMetrics,
         boolean evt,

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b22f9b4..9378017 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -691,6 +691,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
     /** {@inheritDoc} */
     @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+        IgniteInternalTx tx,
         boolean readSwap,
         boolean unmarshal,
         boolean updateMetrics,
@@ -700,7 +701,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiryPlc)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        return (T2<CacheObject, GridCacheVersion>)innerGet0(null,
+        return (T2<CacheObject, GridCacheVersion>)innerGet0(tx,
             readSwap,
             false,
             evt,
@@ -711,7 +712,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             transformClo,
             taskName,
             expiryPlc,
-            false);
+            true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
new file mode 100644
index 0000000..459362b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
+/**
+ *
+ */
+public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
+    implements GridCacheFuture<Map<K, V>> {
+    /** Default max remap count value. */
+    public static final int DFLT_MAX_REMAP_CNT = 3;
+
+    /** Maximum number of attempts to remap key to the same primary node. */
+    protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+
+    /** Context. */
+    protected final GridCacheContext<K, V> cctx;
+
+    /** Keys. */
+    protected Collection<KeyCacheObject> keys;
+
+    /** Reload flag. */
+    protected boolean reload;
+
+    /** Read through flag. */
+    protected boolean readThrough;
+
+    /** Force primary flag. */
+    protected boolean forcePrimary;
+
+    /** Future ID. */
+    protected IgniteUuid futId;
+
+    /** Trackable flag. */
+    protected boolean trackable;
+
+    /** Remap count. */
+    protected AtomicInteger remapCnt = new AtomicInteger();
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name. */
+    protected String taskName;
+
+    /** Whether to deserialize portable objects. */
+    protected boolean deserializePortable;
+
+    /** Skip values flag. */
+    protected boolean skipVals;
+
+    /** Expiry policy. */
+    protected IgniteCacheExpiryPolicy expiryPlc;
+
+    /** Flag indicating that get should be done on a locked topology version. */
+    protected final boolean canRemap;
+
+    /** */
+    protected final boolean needVer;
+
+    /** */
+    protected final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC;
+
+    /**
+     * @param cctx Context.
+     * @param keys Keys.
+     * @param readThrough Read through flag.
+     * @param reload Reload flag.
+     * @param forcePrimary If {@code true} then will force network trip to primary node even
+     *          if called on backup node.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param resC Closure applied on 'get' result.
+     * @param needVer If {@code true} need provide entry version to result closure.
+     */
+    protected CacheDistributedGetFutureAdapter(
+        GridCacheContext<K, V> cctx,
+        Collection<KeyCacheObject> keys,
+        boolean readThrough,
+        boolean reload,
+        boolean forcePrimary,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean canRemap,
+        boolean needVer,
+        @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
+    ) {
+        super(cctx.kernalContext(),
+            resC != null ? new ResultClosureReducer<K, V>(keys.size()) : CU.<K, V>mapsReducer(keys.size()));
+
+        assert !F.isEmpty(keys);
+        assert !needVer || resC != null;
+
+        this.cctx = cctx;
+        this.keys = keys;
+        this.readThrough = readThrough;
+        this.reload = reload;
+        this.forcePrimary = forcePrimary;
+        this.subjId = subjId;
+        this.taskName = taskName;
+        this.deserializePortable = deserializePortable;
+        this.expiryPlc = expiryPlc;
+        this.skipVals = skipVals;
+        this.canRemap = canRemap;
+        this.needVer = needVer;
+        this.resC = resC;
+
+        futId = IgniteUuid.randomUuid();
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     */
+    @SuppressWarnings("unchecked")
+    protected final void resultClosureValue(KeyCacheObject key, Object val, GridCacheVersion ver) {
+        assert resC != null;
+
+        ResultClosureReducer<K, V> rdc = (ResultClosureReducer)reducer();
+
+        assert rdc != null;
+
+        rdc.collect(key);
+
+        resC.apply(key, val, ver);
+    }
+
+    /**
+     *
+     */
+    private static class ResultClosureReducer<K, V> implements IgniteReducer<Map<K, V>, Map<K, V>>  {
+        /** */
+        private final ConcurrentHashMap8<KeyCacheObject, Boolean> map;
+
+        /**
+         * @param keys Number of keys.
+         */
+        public ResultClosureReducer(int keys) {
+            this.map = new ConcurrentHashMap8<>(keys);
+        }
+
+        /**
+         * @param key Key.
+         */
+        void collect(KeyCacheObject key) {
+            map.put(key, Boolean.TRUE);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean collect(@Nullable Map<K, V> map) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<K, V> reduce() {
+            return (Map)map;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 90f6551..0004f02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -199,7 +199,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
 
             if (serReadVer != null) {
                 if (!serReadVer.equals(this.ver)) {
-                    if (!(isNewLocked() && serReadVer.equals(IgniteTxEntry.SER_READ_NEW_ENTRY_VER)))
+                    if (!((isNew() || deleted()) && serReadVer.equals(IgniteTxEntry.READ_NEW_ENTRY_VER)))
                         return null;
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6b8c2ab..d8456d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -24,11 +24,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -40,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -67,83 +63,25 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-
 /**
  * Colocated get future.
  */
-public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>> {
+public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Default max remap count value. */
-    public static final int DFLT_MAX_REMAP_CNT = 3;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
     /** Logger. */
     private static IgniteLogger log;
 
-    /** Maximum number of attempts to remap key to the same primary node. */
-    private static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger(IGNITE_NEAR_GET_MAX_REMAPS,
-        DFLT_MAX_REMAP_CNT);
-
-    /** Context. */
-    private final GridCacheContext<K, V> cctx;
-
-    /** Keys. */
-    private Collection<KeyCacheObject> keys;
-
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
-    /** Reload flag. */
-    private boolean reload;
-
-    /** Read-through flag. */
-    private boolean readThrough;
-
-    /** Force primary flag. */
-    private boolean forcePrimary;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
     /** Version. */
     private GridCacheVersion ver;
 
-    /** Trackable flag. */
-    private volatile boolean trackable;
-
-    /** Remap count. */
-    private AtomicInteger remapCnt = new AtomicInteger();
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name. */
-    private String taskName;
-
-    /** Whether to deserialize portable objects. */
-    private boolean deserializePortable;
-
-    /** Expiry policy. */
-    private IgniteCacheExpiryPolicy expiryPlc;
-
-    /** Skip values flag. */
-    private boolean skipVals;
-
-    /** Flag indicating whether future can be remapped on a newer topology version. */
-    private final boolean canRemap;
-
-    /** */
-    private final boolean needVer;
-
-    /** */
-    private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC;
-
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -158,6 +96,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
      * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} need provide entry version to result closure.
      * @param resC Closure applied on 'get' result.
      */
     public GridPartitionedGetFuture(
@@ -176,27 +115,21 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         boolean needVer,
         @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
     ) {
-        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+        super(cctx,
+            keys,
+            readThrough,
+            reload,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiryPlc,
+            skipVals,
+            canRemap,
+            needVer,
+            resC);
 
-        assert !F.isEmpty(keys);
-        assert !needVer || resC != null;
-
-        this.cctx = cctx;
-        this.keys = keys;
         this.topVer = topVer;
-        this.readThrough = readThrough;
-        this.reload = reload;
-        this.forcePrimary = forcePrimary;
-        this.subjId = subjId;
-        this.deserializePortable = deserializePortable;
-        this.taskName = taskName;
-        this.expiryPlc = expiryPlc;
-        this.skipVals = skipVals;
-        this.canRemap = canRemap;
-        this.needVer = needVer;
-        this.resC = resC;
-
-        futId = IgniteUuid.randomUuid();
 
         ver = cctx.versions().next();
 
@@ -331,7 +264,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
 
         final int keysSize = keys.size();
 
-        Map<K, V> locVals = U.newHashMap(keysSize);
+        Map<K, V> locVals = resC == null ? U.<K, V>newHashMap(keysSize) : null;
 
         boolean hasRmtNodes = false;
 
@@ -342,7 +275,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         if (isDone())
             return;
 
-        if (!locVals.isEmpty())
+        if (!F.isEmpty(locVals))
             add(new GridFinishedFuture<>(locVals));
 
         if (hasRmtNodes) {
@@ -483,6 +416,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
 
                             if (needVer) {
                                 T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                    null,
                                     /*swap*/true,
                                     /*unmarshal*/true,
                                     /**update-metrics*/false,
@@ -521,7 +455,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                             }
                             else {
                                 if (resC != null)
-                                    resC.apply(key, skipVals ? true : v, ver);
+                                    resultClosureValue(key, skipVals ? true : v, ver);
                                 else
                                     cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
 
@@ -628,7 +562,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                 for (GridCacheEntryInfo info : infos) {
                     assert skipVals == (info.value() == null);
 
-                    resC.apply(info.key(), skipVals ? true : info.value(), info.version());
+                    resultClosureValue(info.key(), skipVals ? true : info.value(), info.version());
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index de82068..6b6352f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -353,6 +353,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                             if (needVer) {
                                 T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                    null,
                                     /*swap*/true,
                                     /*unmarshal*/true,
                                     /**update-metrics*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 3b70325..9ed63ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -24,7 +24,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -39,18 +38,17 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -68,83 +66,26 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 
 /**
  *
  */
-public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>> {
+public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Default max remap count value. */
-    public static final int DFLT_MAX_REMAP_CNT = 3;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
     /** Logger. */
     private static IgniteLogger log;
 
-    /** Maximum number of attempts to remap key to the same primary node. */
-    private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
-
-    /** Context. */
-    private final GridCacheContext<K, V> cctx;
-
-    /** Keys. */
-    private Collection<KeyCacheObject> keys;
-
-    /** Reload flag. */
-    private boolean reload;
-
-    /** Read through flag. */
-    private boolean readThrough;
-
-    /** Force primary flag. */
-    private boolean forcePrimary;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Version. */
-    private GridCacheVersion ver;
-
     /** Transaction. */
     private IgniteTxLocalEx tx;
 
-    /** Trackable flag. */
-    private boolean trackable;
-
-    /** Remap count. */
-    private AtomicInteger remapCnt = new AtomicInteger();
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name. */
-    private String taskName;
-
-    /** Whether to deserialize portable objects. */
-    private boolean deserializePortable;
-
-    /** Skip values flag. */
-    private boolean skipVals;
-
-    /** Expiry policy. */
-    private IgniteCacheExpiryPolicy expiryPlc;
-
-    /** Flag indicating that get should be done on a locked topology version. */
-    private final boolean canRemap;
-
     /** */
-    private final boolean needVer;
-
-    /** */
-    private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC;
+    private GridCacheVersion ver;
 
     /**
      * @param cctx Context.
@@ -159,6 +100,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} need provide entry version to result closure.
+     * @param resC Closure applied on 'get' result.
      */
     public GridNearGetFuture(
         GridCacheContext<K, V> cctx,
@@ -176,25 +120,24 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         boolean needVer,
         @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
     ) {
-        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+        super(cctx,
+            keys,
+            readThrough,
+            reload,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiryPlc,
+            skipVals,
+            canRemap,
+            needVer,
+            resC);
 
         assert !F.isEmpty(keys);
         assert !needVer || resC != null;
 
-        this.cctx = cctx;
-        this.keys = keys;
-        this.readThrough = readThrough;
-        this.reload = reload;
-        this.forcePrimary = forcePrimary;
         this.tx = tx;
-        this.subjId = subjId;
-        this.taskName = taskName;
-        this.deserializePortable = deserializePortable;
-        this.expiryPlc = expiryPlc;
-        this.skipVals = skipVals;
-        this.canRemap = canRemap;
-        this.needVer = needVer;
-        this.resC = resC;
 
         futId = IgniteUuid.randomUuid();
 
@@ -474,6 +417,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                 if (isNear) {
                     if (needVer) {
                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
                             /*swap*/true,
                             /*unmarshal*/true,
                             /**update-metrics*/true,
@@ -520,6 +464,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
 
                             if (needVer) {
                                 T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                    null,
                                     /*swap*/true,
                                     /*unmarshal*/true,
                                     /**update-metrics*/false,
@@ -595,7 +540,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
                     }
                     else
-                        resC.apply(key, v, ver);
+                        resultClosureValue(key, v, ver);
                 }
                 else {
                     if (affNode == null) {
@@ -761,7 +706,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     assert skipVals == (info.value() == null);
 
                     if (resC != null)
-                        resC.apply(key, skipVals ? true : val, info.version());
+                        resultClosureValue(key, skipVals ? true : val, info.version());
                     else
                         cctx.addResult(map, key, val, skipVals, false, deserializePortable, false);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 4a7efb4..e48601d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -570,7 +570,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         // Must lock near entries separately.
         if (m.near()) {
             try {
-                tx.optimisticLockEntries(req.writes());
+                tx.optimisticLockEntries(m.entries());
 
                 tx.userPrepare();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 2c2915f..c43cab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -362,7 +363,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 needVer,
                 c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
                 @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
-                    return true;
+                    try {
+                        Map<Object, Object> map = f.get();
+
+                        if (map != null && map.size() != keys.size()) {
+                            for (KeyCacheObject key : keys) {
+                                if (!map.containsKey(key))
+                                    c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER);
+                            }
+                        }
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        setRollbackOnly();
+
+                        throw new GridClosureException(e);
+                    }
                 }
             });
         }
@@ -383,7 +400,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 c
             ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
                 @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
-                    return true;
+                    try {
+                        Map<Object, Object> map = f.get();
+
+                        if (map != null && map.size() != keys.size()) {
+                            for (KeyCacheObject key : keys) {
+                                if (!map.containsKey(key))
+                                    c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER);
+                            }
+                        }
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        setRollbackOnly();
+
+                        throw new GridClosureException(e);
+                    }
                 }
             });
         } else {
@@ -868,7 +901,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
         @Nullable Collection<IgniteTxEntry> reads,
         @Nullable Collection<IgniteTxEntry> writes,
-        Map<UUID, Collection<UUID>> txNodes, boolean last,
+        Map<UUID, Collection<UUID>> txNodes,
+        boolean last,
         Collection<UUID> lastBackups
     ) {
         if (state() != PREPARING) {
@@ -896,7 +930,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         try {
             // At this point all the entries passed in must be enlisted in transaction because this is an
             // optimistic transaction.
-            optimisticLockEntries = writes;
+            optimisticLockEntries = optimistic() && serializable() ? F.concat(false, writes, reads) : writes;
 
             userPrepare();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 0286efe..c896f6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -325,7 +325,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
 
         threadId = Thread.currentThread().getId();
 
-        log = U.logger(cctx.kernalContext(), logRef, this);
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, this);
     }
 
     /**
@@ -374,7 +375,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         implicitSingle = false;
         loc = false;
 
-        log = U.logger(cctx.kernalContext(), logRef, this);
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, this);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f1cd2d4..7929167 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -67,7 +67,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     private static final long serialVersionUID = 0L;
 
     /** Dummy version for non-existing entry read in SERIALIZABLE transaction. */
-    public static final GridCacheVersion SER_READ_NEW_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0);
+    public static final GridCacheVersion READ_NEW_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0);
 
     /** Owning transaction. */
     @GridToStringExclude
@@ -322,6 +322,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         cp.conflictVer = conflictVer;
         cp.expiryPlc = expiryPlc;
         cp.flags = flags;
+        cp.serReadVer = serReadVer;
 
         return cp;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2b745ac..76df164 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -1268,11 +1269,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         AffinityTopologyVersion topVer = topologyVersion();
 
+        boolean needReadVer = optimistic() && serializable();
+
         // In this loop we cover only read-committed or optimistic transactions.
         // Transactions that are pessimistic and not read-committed are covered
         // outside of this loop.
         for (KeyCacheObject key : keys) {
-            if (pessimistic() && !readCommitted() && !skipVals)
+            if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
                 addActiveCache(cacheCtx);
 
             IgniteTxKey txKey = cacheCtx.txKey(key);
@@ -1370,24 +1373,42 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         GridCacheVersion ver = entry.version();
 
                         CacheObject val = null;
+                        GridCacheVersion readVer = null;
 
                         if (!pessimistic() || readCommitted() && !skipVals) {
                             IgniteCacheExpiryPolicy accessPlc =
                                 optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
 
-                            // This call will check for filter.
-                            val = entry.innerGet(this,
-                                /*swap*/true,
-                                /*no read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /*metrics*/true,
-                                /*event*/true,
-                                /*temporary*/false,
-                                CU.subjectId(this, cctx),
-                                null,
-                                resolveTaskName(),
-                                accessPlc);
+                            if (needReadVer) {
+                                T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
+                                    /*swap*/true,
+                                    /*unmarshal*/true,
+                                    /*metrics*/true,
+                                    /*event*/true,
+                                    CU.subjectId(this, cctx),
+                                    null,
+                                    resolveTaskName(),
+                                    accessPlc);
+
+                                if (res != null) {
+                                    val = res.get1();
+                                    readVer = res.get2();
+                                }
+                            }
+                            else {
+                                val = entry.innerGet(this,
+                                    /*swap*/true,
+                                    /*no read-through*/false,
+                                    /*fail-fast*/true,
+                                    /*unmarshal*/true,
+                                    /*metrics*/true,
+                                    /*event*/true,
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    null,
+                                    resolveTaskName(),
+                                    accessPlc);
+                            }
 
                             if (val != null) {
                                 cacheCtx.addResult(map,
@@ -1421,8 +1442,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                             // As optimization, mark as checked immediately
                             // for non-pessimistic if value is not null.
-                            if (val != null && !pessimistic())
+                            if (val != null && !pessimistic()) {
                                 txEntry.markValid();
+
+                                if (needReadVer) {
+                                    assert readVer != null;
+
+                                    txEntry.serializableReadVersion(readVer);
+                                }
+                            }
                         }
 
                         break; // While.
@@ -1532,9 +1560,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         if (log.isDebugEnabled())
             log.debug("Loading missed values for missed map: " + missedMap);
 
-        final Collection<KeyCacheObject> loaded = U.newHashSet(missedMap.size());
+        final Collection<KeyCacheObject> loaded =
+            readCommitted() ? U.<KeyCacheObject>newHashSet(missedMap.size()) : null;
 
-        final boolean needVer = optimistic() && serializable();
+        final boolean needReadVer = optimistic() && serializable();
 
         return new GridEmbeddedFuture<>(
             new C2<Boolean, Exception, Map<K, V>>() {
@@ -1555,27 +1584,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         }
                     }
 
-                    // In read-committed mode touch entries that have just been read.
-                    boolean touch = readCommitted();
+                    if (readCommitted()) {
+                        assert loaded != null;
 
-                    for (KeyCacheObject key : missedMap.keySet()) {
-                        if (loaded.contains(key))
-                            continue;
+                        Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet());
 
-                        GridCacheVersion ver = needVer ? IgniteTxEntry.SER_READ_NEW_ENTRY_VER : null;
+                        notFound.removeAll(loaded);
 
-                        onLoaded(key,
-                            null,
-                            ver,
-                            cacheCtx,
-                            map,
-                            missedMap,
-                            deserializePortable,
-                            skipVals,
-                            keepCacheObjects,
-                            loaded);
+                        // In read-committed mode touch entries that have just been read.
+                        for (KeyCacheObject key : notFound) {
+                            if (loaded.contains(key))
+                                continue;
 
-                        if (touch) {
                             IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
 
                             GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
@@ -1596,174 +1616,141 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 missedMap.keySet(),
                 deserializePortable,
                 skipVals,
-                needVer,
+                needReadVer,
                 new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
-                    @Override public void apply(KeyCacheObject key,
-                        @Nullable Object val,
-                        @Nullable GridCacheVersion loadVer) {
-                        onLoaded(key,
-                            val,
-                            loadVer,
-                            cacheCtx,
-                            map,
-                            missedMap,
-                            deserializePortable,
-                            skipVals,
-                            keepCacheObjects,
-                            loaded);
-                    }
-                })
-        );
-    }
+                    /** */
+                    private GridCacheVersion nextVer;
 
-    /**
-     * @param key Key.
-     * @param val Value.
-     * @param loadVer Entry version.
-     * @param cacheCtx Cache context.
-     * @param map Return map.
-     * @param missedMap Missed keys.
-     * @param deserializePortable Deserialize portable flag.
-     * @param skipVals Skip values flag.
-     * @param keepCacheObjects Keep cache objects flag.
-     * @param loaded Loaded values map.
-     */
-    private <K, V> void onLoaded(
-        KeyCacheObject key,
-        @Nullable Object val,
-        @Nullable GridCacheVersion loadVer,
-        GridCacheContext cacheCtx,
-        Map<K, V> map,
-        Map<KeyCacheObject, GridCacheVersion> missedMap,
-        final boolean deserializePortable,
-        boolean skipVals,
-        boolean keepCacheObjects,
-        Collection<KeyCacheObject> loaded) {
-        if (isRollbackOnly()) {
-            if (log.isDebugEnabled())
-                log.debug("Ignoring loaded value for read because transaction was rolled back: " +
-                    IgniteTxLocalAdapter.this);
+                    @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
+                        if (isRollbackOnly()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Ignoring loaded value for read because transaction was rolled back: " +
+                                    IgniteTxLocalAdapter.this);
 
-            return;
-        }
+                            return;
+                        }
 
-        GridCacheVersion ver = missedMap.get(key);
+                        GridCacheVersion ver = missedMap.get(key);
 
-        if (ver == null) {
-            if (log.isDebugEnabled())
-                log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
+                        if (ver == null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
 
-            return;
-        }
+                            return;
+                        }
 
-        CacheObject cacheVal = cacheCtx.toCacheObject(val);
+                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
-        CacheObject visibleVal = cacheVal;
+                        CacheObject visibleVal = cacheVal;
 
-        IgniteTxKey txKey = cacheCtx.txKey(key);
+                        IgniteTxKey txKey = cacheCtx.txKey(key);
 
-        IgniteTxEntry txEntry = entry(txKey);
+                        IgniteTxEntry txEntry = entry(txKey);
 
-        if (txEntry != null) {
-            if (!readCommitted())
-                txEntry.readValue(cacheVal);
+                        if (txEntry != null) {
+                            if (!readCommitted())
+                                txEntry.readValue(cacheVal);
 
-            if (!F.isEmpty(txEntry.entryProcessors()))
-                visibleVal = txEntry.applyEntryProcessors(visibleVal);
-        }
+                            if (!F.isEmpty(txEntry.entryProcessors()))
+                                visibleVal = txEntry.applyEntryProcessors(visibleVal);
+                        }
 
-        // In pessimistic mode we hold the lock, so filter validation
-        // should always be valid.
-        if (pessimistic())
-            ver = null;
+                        // In pessimistic mode we hold the lock, so filter validation
+                        // should always be valid.
+                        if (pessimistic())
+                            ver = null;
 
-        // Initialize next version.
-        GridCacheVersion nextVer = cctx.versions().next(topologyVersion());
+                        // Initialize next version.
+                        if (nextVer == null)
+                            nextVer = cctx.versions().next(topologyVersion());
 
-        while (true) {
-            assert txEntry != null || readCommitted() || skipVals;
+                        while (true) {
+                            assert txEntry != null || readCommitted() || skipVals;
 
-            GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+                            GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
 
-            try {
-                // Must initialize to true since even if filter didn't pass,
-                // we still record the transaction value.
-                boolean set;
+                            try {
+                                // Must initialize to true since even if filter didn't pass,
+                                // we still record the transaction value.
+                                boolean set;
 
-                try {
-                    set = e.versionedValue(cacheVal, ver, nextVer);
-                }
-                catch (GridCacheEntryRemovedException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry in transaction getAll method " +
-                            "(will try again): " + e);
+                                try {
+                                    set = e.versionedValue(cacheVal, ver, nextVer);
+                                }
+                                catch (GridCacheEntryRemovedException ignore) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got removed entry in transaction getAll method " +
+                                            "(will try again): " + e);
 
-                    if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
-                        U.error(log, "Inconsistent transaction state (entry got removed while " +
-                            "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
+                                    if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
+                                        U.error(log, "Inconsistent transaction state (entry got removed while " +
+                                            "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
 
-                        setRollbackOnly();
+                                        setRollbackOnly();
 
-                        return;
-                    }
+                                        return;
+                                    }
 
-                    if (txEntry != null)
-                        txEntry.cached(entryEx(cacheCtx, txKey));
+                                    if (txEntry != null)
+                                        txEntry.cached(entryEx(cacheCtx, txKey));
 
-                    continue; // While loop.
-                }
+                                    continue; // While loop.
+                                }
 
-                // In pessimistic mode, we should always be able to set.
-                assert set || !pessimistic();
+                                // In pessimistic mode, we should always be able to set.
+                                assert set || !pessimistic();
 
-                if (readCommitted() || skipVals) {
-                    cacheCtx.evicts().touch(e, topologyVersion());
+                                if (readCommitted() || skipVals) {
+                                    cacheCtx.evicts().touch(e, topologyVersion());
 
-                    if (visibleVal != null) {
-                        cacheCtx.addResult(map,
-                            key,
-                            visibleVal,
-                            skipVals,
-                            keepCacheObjects,
-                            deserializePortable,
-                            false);
-                    }
-                }
-                else {
-                    assert txEntry != null;
+                                    if (visibleVal != null) {
+                                        cacheCtx.addResult(map,
+                                            key,
+                                            visibleVal,
+                                            skipVals,
+                                            keepCacheObjects,
+                                            deserializePortable,
+                                            false);
+                                    }
+                                }
+                                else {
+                                    assert txEntry != null;
 
-                    txEntry.setAndMarkValid(cacheVal);
+                                    txEntry.setAndMarkValid(cacheVal);
 
-                    if (optimistic() && serializable()) {
-                        assert loadVer != null;
+                                    if (needReadVer) {
+                                        assert loadVer != null;
 
-                        txEntry.serializableReadVersion(loadVer);
-                    }
+                                        txEntry.serializableReadVersion(loadVer);
+                                    }
 
-                    if (visibleVal != null) {
-                        cacheCtx.addResult(map,
-                            key,
-                            visibleVal,
-                            skipVals,
-                            keepCacheObjects,
-                            deserializePortable,
-                            false);
-                    }
-                }
+                                    if (visibleVal != null) {
+                                        cacheCtx.addResult(map,
+                                            key,
+                                            visibleVal,
+                                            skipVals,
+                                            keepCacheObjects,
+                                            deserializePortable,
+                                            false);
+                                    }
+                                }
 
-                if (val != null)
-                    loaded.add(key);
+                                if (readCommitted())
+                                    loaded.add(key);
 
-                if (log.isDebugEnabled())
-                    log.debug("Set value loaded from store into entry from transaction [set=" + set +
-                        ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+                                if (log.isDebugEnabled())
+                                    log.debug("Set value loaded from store into entry from transaction [set=" + set +
+                                        ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
 
-                break; // While loop.
-            }
-            catch (IgniteCheckedException ex) {
-                throw new IgniteException("Failed to put value for cache entry: " + e, ex);
-            }
-        }
+                                break; // While loop.
+                            }
+                            catch (IgniteCheckedException ex) {
+                                throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+                            }
+                        }
+                    }
+                })
+        );
     }
 
     /** {@inheritDoc} */
@@ -2066,14 +2053,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     /**
      * Checks filter for non-pessimistic transactions.
      *
-     * @param cached Cached entry.
+     * @param cctx Cache context.
+     * @param key Key.
+     * @param val Value.
      * @param filter Filter to check.
      * @return {@code True} if passed or pessimistic.
-     * @throws IgniteCheckedException If failed.
      */
-    private <K, V> boolean filter(GridCacheEntryEx cached,
-        CacheEntryPredicate[] filter) throws IgniteCheckedException {
-        return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter);
+    private boolean filter(
+        GridCacheContext cctx,
+        KeyCacheObject key,
+        CacheObject val,
+        CacheEntryPredicate[] filter) {
+        return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter);
     }
 
     /**
@@ -2097,7 +2088,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param skipStore Skip store flag.
      * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
      */
-    protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
+    private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
         final GridCacheContext cacheCtx,
         Collection<?> keys,
         @Nullable GridCacheEntryEx cached,
@@ -2108,7 +2099,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         @Nullable Object[] invokeArgs,
         boolean retval,
         boolean lockOnly,
-        CacheEntryPredicate[] filter,
+        final CacheEntryPredicate[] filter,
         final GridCacheReturn ret,
         Collection<KeyCacheObject> enlisted,
         @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
@@ -2117,6 +2108,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     ) {
         assert cached == null || keys.size() == 1;
         assert cached == null || F.first(keys).equals(cached.key());
+        assert retval || invokeMap == null;
 
         try {
             addActiveCache(cacheCtx);
@@ -2131,6 +2123,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         Set<KeyCacheObject> missedForLoad = null;
 
+        final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+        final boolean needReadVer = (retval || hasFilters) && (optimistic() && serializable());
+
         try {
             // Set transform flag for transaction.
             if (invokeMap != null)
@@ -2210,24 +2205,40 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     ", locNodeId=" + cctx.localNodeId() + ']');
 
                             CacheObject old = null;
-
-                            boolean readThrough = !skipStore && !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+                            GridCacheVersion readVer = null;
 
                             if (optimistic() && !implicit()) {
                                 try {
-                                    // Should read through if filter is specified.
-                                    old = entry.innerGet(this,
-                                        /*swap*/false,
-                                        /*read-through*/readThrough && cacheCtx.loadPreviousValue(),
-                                        /*fail-fast*/false,
-                                        /*unmarshal*/retval,
-                                        /*metrics*/retval,
-                                        /*events*/retval,
-                                        /*temporary*/false,
-                                        CU.subjectId(this, cctx),
-                                        entryProcessor,
-                                        resolveTaskName(),
-                                        null);
+                                    if (needReadVer) {
+                                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
+                                            /*swap*/false,
+                                            /*unmarshal*/retval,
+                                            /*metrics*/retval,
+                                            /*events*/retval,
+                                            CU.subjectId(this, cctx),
+                                            entryProcessor,
+                                            resolveTaskName(),
+                                            null);
+
+                                        if (res != null) {
+                                            old = res.get1();
+                                            readVer = res.get2();
+                                        }
+                                    }
+                                    else {
+                                        old = entry.innerGet(this,
+                                            /*swap*/false,
+                                            /*read-through*/false,
+                                            /*fail-fast*/false,
+                                            /*unmarshal*/retval,
+                                            /*metrics*/retval,
+                                            /*events*/retval,
+                                            /*temporary*/false,
+                                            CU.subjectId(this, cctx),
+                                            entryProcessor,
+                                            resolveTaskName(),
+                                            null);
+                                    }
                                 }
                                 catch (ClusterTopologyCheckedException e) {
                                     entry.context().evicts().touch(entry, topologyVersion());
@@ -2243,12 +2254,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             else
                                 old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
 
-                            if (!filter(entry, filter)) {
+                            if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
                                 skipped = skip(skipped, cacheKey);
 
                                 ret.set(cacheCtx, old, false);
 
-                                if (!readCommitted() && old != null) {
+                                if (!readCommitted()) {
                                     // Enlist failed filters as reads for non-read-committed mode,
                                     // so future ops will get the same values.
                                     txEntry = addEntry(READ,
@@ -2265,9 +2276,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                         skipStore);
 
                                     txEntry.markValid();
+
+                                    if (needReadVer) {
+                                        assert readVer != null;
+
+                                        txEntry.serializableReadVersion(readVer);
+                                    }
                                 }
 
-                                if (readCommitted() || old == null)
+                                if (readCommitted())
                                     cacheCtx.evicts().touch(entry, topologyVersion());
 
                                 break; // While.
@@ -2298,7 +2315,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 txEntry.markValid();
 
                                 if (old == null) {
-                                    boolean load = retval && !readThrough;
+                                    boolean load = retval || hasFilters;
 
                                     if (load) {
                                         if (missedForLoad == null)
@@ -2317,6 +2334,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     }
                                 }
                                 else {
+                                    if (needReadVer) {
+                                        assert readVer != null;
+
+                                        txEntry.serializableReadVersion(readVer);
+                                    }
+
                                     if (retval && !transform)
                                         ret.set(cacheCtx, old, true);
                                     else {
@@ -2362,7 +2385,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 else {
                     if (entryProcessor == null && txEntry.op() == TRANSFORM)
                         throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
-                            "transaction after transform closure is applied): " + key);
+                            "transaction after EntryProcessor is applied): " + key);
 
                     GridCacheEntryEx entry = txEntry.cached();
 
@@ -2371,7 +2394,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     boolean del = txEntry.op() == DELETE && rmv;
 
                     if (!del) {
-                        if (!filter(entry, filter)) {
+                        if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
                             skipped = skip(skipped, cacheKey);
 
                             ret.set(cacheCtx, v, false);
@@ -2439,7 +2462,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 missedForLoad,
                 deserializePortables(cacheCtx),
                 /*skip values*/false,
-                false,
+                needReadVer,
                 new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
                     @Override public void apply(KeyCacheObject key,
                         @Nullable Object val,
@@ -2451,6 +2474,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                         assert e != null;
 
+                        if (needReadVer) {
+                            assert loadVer != null;
+
+                            e.serializableReadVersion(loadVer);
+                        }
+
                         CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
                         if (e.op() == TRANSFORM) {
@@ -2470,8 +2499,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                             addInvokeResult(e, cacheVal, ret, ver);
                         }
-                        else
-                            ret.set(cacheCtx, cacheVal, true);
+                        else {
+                            boolean success = hasFilters ? isAll(e.context(), key, cacheVal, filter) : true;
+
+                            ret.set(cacheCtx, cacheVal, success);
+                        }
                     }
                 });
 
@@ -2491,6 +2523,31 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param cctx Cache context.
+     * @param key Key.
+     * @param val Value.
+     * @param filter Filter.
+     * @return {@code True} if filter passed.
+     */
+    private boolean isAll(GridCacheContext cctx,
+        KeyCacheObject key,
+        CacheObject val,
+        CacheEntryPredicate[] filter) {
+        GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
+            @Nullable @Override public CacheObject peekVisibleValue() {
+                return rawGet();
+            }
+        };
+
+        for (CacheEntryPredicate p0 : filter) {
+            if (p0 != null && !p0.apply(e))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * Post lock processing for put or remove.
      *
      * @param cacheCtx Context.