You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/24 04:41:01 UTC

[3/4] ignite git commit: Merge branches 'ignite-843' and 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-843

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 221b230..eb7c78f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -155,7 +155,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable UUID subjId,
         String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -183,7 +184,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             });
         }
 
-        AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx == null ?
+            (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+            tx.topologyVersion();
 
         subjId = ctx.subjectIdPerCall(subjId, opCtx);
 
@@ -197,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             taskName,
             deserializePortable,
             skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
-            skipVals);
+            skipVals,
+            canRemap);
     }
 
     /** {@inheritDoc} */
@@ -226,7 +230,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param skipVals Skip values flag.
      * @return Loaded values.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys,
+    public IgniteInternalFuture<Map<K, V>> loadAsync(
+        @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean reload,
         boolean forcePrimary,
@@ -235,7 +240,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         String taskName,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -340,7 +346,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             taskName,
             deserializePortable,
             expiryPlc,
-            skipVals);
+            skipVals,
+            canRemap);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index c784948..90ca8df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1125,8 +1125,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      * @return Topology exception with user-friendly message.
      */
     private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
-        return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
-            "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+        ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+            "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+        return topEx;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbf6b40..1a90de9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -54,6 +54,10 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
     implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
     /** */
+    private static final int DUMP_PENDING_OBJECTS_THRESHOLD =
+        IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Dummy flag. */
@@ -711,6 +715,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 if (log.isDebugEnabled())
                     log.debug("Before waiting for partition release future: " + this);
 
+                int dumpedObjects = 0;
+
                 while (true) {
                     try {
                         partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
@@ -719,7 +725,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     }
                     catch (IgniteFutureTimeoutCheckedException ignored) {
                         // Print pending transactions and locks that might have led to hang.
-                        dumpPendingObjects();
+                        if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+                            dumpPendingObjects();
+
+                            dumpedObjects++;
+                        }
                     }
                 }
 
@@ -731,6 +741,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                 IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
 
+                dumpedObjects = 0;
+
                 while (true) {
                     try {
                         locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
@@ -738,22 +750,26 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                         break;
                     }
                     catch (IgniteFutureTimeoutCheckedException ignored) {
-                        U.warn(log, "Failed to wait for locks release future. " +
-                            "Dumping pending objects that might be the cause: " + cctx.localNodeId());
+                        if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+                            U.warn(log, "Failed to wait for locks release future. " +
+                                "Dumping pending objects that might be the cause: " + cctx.localNodeId());
 
-                        U.warn(log, "Locked keys:");
+                            U.warn(log, "Locked keys:");
 
-                        for (IgniteTxKey key : cctx.mvcc().lockedKeys())
-                            U.warn(log, "Locked key: " + key);
+                            for (IgniteTxKey key : cctx.mvcc().lockedKeys())
+                                U.warn(log, "Locked key: " + key);
 
-                        for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
-                            U.warn(log, "Locked near key: " + key);
+                            for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
+                                U.warn(log, "Locked near key: " + key);
 
-                        Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
-                            cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
+                            Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
+                                cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
 
-                        for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
-                            U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+                            for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
+                                U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+                            
+                            dumpedObjects++;
+                        }
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 041f83a..2bf5365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -364,7 +364,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -387,7 +388,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
             deserializePortable,
             skipVals ? null : opCtx != null ? opCtx.expiry() : null,
             skipVals,
-            opCtx != null && opCtx.skipStore());
+            opCtx != null && opCtx.skipStore(),
+            canRemap);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 79b7c1a..845be38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -203,13 +203,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         return (IgniteInternalFuture)loadAsync(tx,
             keys,
             reload,
-            false,
+            /*force primary*/false,
             subjId,
             taskName,
-            true,
-            null,
+            /*deserialize portable*/true,
+            /*expiry policy*/null,
             skipVals,
-            /*skip store*/false);
+            /*skip store*/false,
+            /*can remap*/true);
     }
 
     /**
@@ -234,7 +235,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         boolean deserializePortable,
         @Nullable ExpiryPolicy expiryPlc,
         boolean skipVal,
-        boolean skipStore
+        boolean skipStore,
+        boolean canRemap
     ) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -253,7 +255,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
             taskName,
             deserializePortable,
             expiry,
-            skipVal);
+            skipVal,
+            canRemap);
 
         // init() will register future for responses if future has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 9e8d76b..6f4f15e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -333,7 +333,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
             true,
             null,
             false,
-            /*skip store*/false).get().get(keyValue(false));
+            /*skip store*/false,
+            /*can remap*/true
+        ).get().get(keyValue(false));
     }
 
     /**
@@ -433,6 +435,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     @Override public GridCacheMvccCandidate addLocal(
         long threadId,
         GridCacheVersion ver,
+        AffinityTopologyVersion topVer,
         long timeout,
         boolean reenter,
         boolean tx,
@@ -441,6 +444,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
             null,
             threadId,
             ver,
+            topVer,
             timeout,
             reenter,
             tx,
@@ -454,6 +458,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
      * @param dhtNodeId DHT node ID.
      * @param threadId Owning thread ID.
      * @param ver Lock version.
+     * @param topVer Topology version.
      * @param timeout Timeout to acquire lock.
      * @param reenter Reentry flag.
      * @param tx Transaction flag.
@@ -465,6 +470,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         @Nullable UUID dhtNodeId,
         long threadId,
         GridCacheVersion ver,
+        AffinityTopologyVersion topVer,
         long timeout,
         boolean reenter,
         boolean tx,
@@ -513,6 +519,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 tx,
                 implicitSingle);
 
+            cand.topologyVersion(topVer);
+
             owner = mvcc.anyOwner();
 
             boolean emptyAfter = mvcc.isEmpty();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 2017654..951fddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -62,7 +62,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
 
     /** Context. */
-    private GridCacheContext<K, V> cctx;
+    private final GridCacheContext<K, V> cctx;
 
     /** Keys. */
     private Collection<KeyCacheObject> keys;
@@ -106,6 +106,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     /** Expiry policy. */
     private IgniteCacheExpiryPolicy expiryPlc;
 
+    /** Flag indicating that get should be done on a locked topology version. */
+    private final boolean canRemap;
+
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -131,7 +134,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         String taskName,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
@@ -148,6 +152,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         this.deserializePortable = deserializePortable;
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
+        this.canRemap = canRemap;
 
         futId = IgniteUuid.randomUuid();
 
@@ -161,7 +166,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * Initializes future.
      */
     public void init() {
-        AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx == null ?
+            (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
+            tx.topologyVersion();
 
         map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
@@ -327,7 +334,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                             remapKeys.add(key);
                     }
 
-                    AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+                    AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
 
                     assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
                         "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -435,7 +442,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         taskName,
                         expiryPlc);
 
-                ClusterNode primary = null;
+                ClusterNode affNode = null;
 
                 if (v == null && allowLocRead && cctx.affinityNode()) {
                     GridDhtCacheAdapter<K, V> dht = cache().dht();
@@ -472,16 +479,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                                 near.metrics0().onRead(true);
                         }
                         else {
-                            primary = cctx.affinity().primary(key, topVer);
+                            affNode = affinityNode(key, topVer);
 
-                            if (primary == null) {
+                            if (affNode == null) {
                                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                                     "(all partition nodes left the grid)."));
 
                                 return savedVers;
                             }
 
-                            if (!primary.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+                            if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
                                 near.metrics0().onRead(false);
                         }
                     }
@@ -507,10 +514,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
                 }
                 else {
-                    if (primary == null) {
-                        primary = cctx.affinity().primary(key, topVer);
+                    if (affNode == null) {
+                        affNode = affinityNode(key, topVer);
 
-                        if (primary == null) {
+                        if (affNode == null) {
                             onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                                 "(all partition nodes left the grid)."));
 
@@ -527,13 +534,13 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
 
                     savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion());
 
-                    LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(primary);
+                    LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
 
                     if (keys != null && keys.containsKey(key)) {
                         if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
                             onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
                                 MAX_REMAP_CNT + " attempts (key got remapped to the same node) " +
-                                "[key=" + key + ", node=" + U.toShortString(primary) + ", mappings=" + mapped + ']'));
+                                "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']'));
 
                             return savedVers;
                         }
@@ -545,10 +552,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(key)))
                         addRdr = true;
 
-                    LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(primary);
+                    LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(affNode);
 
                     if (old == null)
-                        mappings.put(primary, old = new LinkedHashMap<>(3, 1f));
+                        mappings.put(affNode, old = new LinkedHashMap<>(3, 1f));
 
                     old.put(key, addRdr);
                 }
@@ -579,6 +586,28 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     }
 
     /**
+     * Affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node to get key from.
+     */
+    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : affNodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
+
+    /**
      * @return Near cache.
      */
     private GridNearCacheAdapter<K, V> cache() {
@@ -752,37 +781,46 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
 
-            final AffinityTopologyVersion updTopVer =
-                new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+            // Try getting value from alive nodes.
+            if (!canRemap) {
+                // Remap
+                map(keys.keySet(), F.t(node, keys), topVer);
 
-            final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                cctx.kernalContext().config().getNetworkTimeout(),
-                updTopVer,
-                e);
-
-            cctx.affinity().affinityReadyFuture(updTopVer).listen(
-                new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        if (timeout.finish()) {
-                            cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
-                            try {
-                                fut.get();
-
-                                // Remap.
-                                map(keys.keySet(), F.t(node, keys), updTopVer);
-
-                                onDone(Collections.<K, V>emptyMap());
-                            }
-                            catch (IgniteCheckedException e) {
-                                GridNearGetFuture.this.onDone(e);
+                onDone(Collections.<K, V>emptyMap());
+            }
+            else {
+                final AffinityTopologyVersion updTopVer =
+                    new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+                    cctx.kernalContext().config().getNetworkTimeout(),
+                    updTopVer,
+                    e);
+
+                cctx.affinity().affinityReadyFuture(updTopVer).listen(
+                    new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            if (timeout.finish()) {
+                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+
+                                try {
+                                    fut.get();
+
+                                    // Remap.
+                                    map(keys.keySet(), F.t(node, keys), updTopVer);
+
+                                    onDone(Collections.<K, V>emptyMap());
+                                }
+                                catch (IgniteCheckedException e) {
+                                    GridNearGetFuture.this.onDone(e);
+                                }
                             }
                         }
                     }
-                }
-            );
+                );
 
-            cctx.kernalContext().timeout().addTimeoutObject(timeout);
+                cctx.kernalContext().timeout().addTimeoutObject(timeout);
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 3d28018..2815194 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -307,6 +307,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             dhtNodeId,
             threadId,
             lockVer,
+            topVer,
             timeout,
             !inTx(),
             inTx(),
@@ -319,9 +320,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             txEntry.cached(entry);
         }
 
-        if (c != null)
-            c.topologyVersion(topVer);
-
         synchronized (mux) {
             entries.add(entry);
         }
@@ -1234,8 +1232,12 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      * @return Topology exception with user-friendly message.
      */
     private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
-        return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
-            "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+        ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+            "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+        return topEx;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e71dd62..8e66cb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -101,7 +101,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                 MiniFuture f = (MiniFuture) fut;
 
                 if (f.node().id().equals(nodeId)) {
-                    f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+                        nodeId);
+
+                    e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                    f.onResult(e);
 
                     found = true;
                 }
@@ -563,8 +568,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
             }
+            catch (ClusterTopologyCheckedException e) {
+                e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                fut.onResult(e);
+            }
             catch (IgniteCheckedException e) {
-                // Fail the whole thing.
                 fut.onResult(e);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index f51b4b8..9ce5bd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -72,7 +72,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             MiniFuture f = (MiniFuture)fut;
 
             if (f.node().id().equals(nodeId)) {
-                f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+                ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+                    nodeId);
+
+                e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                f.onNodeLeft(e);
 
                 found = true;
             }
@@ -224,6 +229,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                     cctx.io().send(node, req, tx.ioPolicy());
                 }
                 catch (ClusterTopologyCheckedException e) {
+                    e.retryReadyFuture(cctx.nextAffinityReadyFuture(topVer));
+
                     fut.onNodeLeft(e);
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 696acfb..a1f1383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -101,7 +101,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         @Nullable UUID subjId,
         String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -142,7 +143,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             deserializePortable,
             skipVals ? null : opCtx != null ? opCtx.expiry() : null,
             skipVals,
-            skipStore);
+            skipStore,
+            canRemap);
     }
 
     /**
@@ -172,7 +174,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             tx.resolveTaskName(),
             deserializePortable,
             expiryPlc,
-            skipVals);
+            skipVals,
+            /*can remap*/true);
 
         // init() will register future for responses if it has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index cb391e4..5ff7345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -313,7 +313,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             });
         }
         else if (cacheCtx.isColocated()) {
-            return cacheCtx.colocated().loadAsync(keys,
+            return cacheCtx.colocated().loadAsync(
+                keys,
                 readThrough,
                 /*reload*/false,
                 /*force primary*/false,
@@ -322,7 +323,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 resolveTaskName(),
                 deserializePortable,
                 accessPolicy(cacheCtx, keys),
-                skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+                skipVals,
+                /*can remap*/true
+            ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
                     @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
                         try {
                             Map<Object, Object> map = f.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index b418500..b24c62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -101,7 +101,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
      * @param miniId Mini future ID.
      * @param dhtVer DHT version.
      * @param writeVer Write version.
-     * @param invalidParts Invalid partitions.
      * @param retVal Return value.
      * @param err Error.
      * @param clientRemapVer Not {@code null} if client node should remap transaction.
@@ -112,7 +111,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         IgniteUuid miniId,
         GridCacheVersion dhtVer,
         GridCacheVersion writeVer,
-        Collection<Integer> invalidParts,
         GridCacheReturn retVal,
         Throwable err,
         AffinityTopologyVersion clientRemapVer
@@ -127,7 +125,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         this.miniId = miniId;
         this.dhtVer = dhtVer;
         this.writeVer = writeVer;
-        this.invalidParts = invalidParts;
         this.retVal = retVal;
         this.clientRemapVer = clientRemapVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index ea59f1f..6c04761 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -261,7 +261,9 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
 
                 // Allow next lock in the thread to proceed.
                 if (!cand.used()) {
-                    GridLocalCacheEntry e = (GridLocalCacheEntry)cctx.cache().peekEx(cand.key());
+                    GridCacheContext cctx0 = cand.parent().context();
+
+                    GridLocalCacheEntry e = (GridLocalCacheEntry)cctx0.cache().peekEx(cand.key());
 
                     // At this point candidate may have been removed and entry destroyed,
                     // so we check for null.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 8dd3276..3dc5946 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -458,7 +458,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         @Nullable UUID subjId,
         final String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         A.notNull(keys, "keys");
 
@@ -570,8 +571,18 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         if (success || !storeEnabled)
             return vals;
 
-        return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), null, false, subjId, taskName, deserializePortable,
-            false, expiry, skipVals).get();
+        return getAllAsync(
+            keys,
+            opCtx == null || !opCtx.skipStore(),
+            null,
+            false,
+            subjId,
+            taskName,
+            deserializePortable,
+            /*force primary*/false,
+            expiry,
+            skipVals,
+            /*can remap*/true).get();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 4f21ad7..4e43d97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -302,7 +302,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     /**
      * @return Invalid partitions.
      */
-    public Set<Integer> invalidPartitions();
+    public Map<Integer, Set<Integer>> invalidPartitions();
 
     /**
      * Gets owned version for near remote transaction.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index e9fdd22..b8beb15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -162,7 +162,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     private AtomicBoolean preparing = new AtomicBoolean();
 
     /** */
-    private Set<Integer> invalidParts = new GridLeanSet<>();
+    private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
 
     /**
      * Transaction state. Note that state is not protected, as we want to
@@ -671,16 +671,25 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public Set<Integer> invalidPartitions() {
+    @Override public Map<Integer, Set<Integer>> invalidPartitions() {
         return invalidParts;
     }
 
     /** {@inheritDoc} */
     @Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) {
-        invalidParts.add(part);
+        Set<Integer> parts = invalidParts.get(cacheCtx.cacheId());
+
+        if (parts == null) {
+            parts = new GridLeanSet<>();
+
+            invalidParts.put(cacheCtx.cacheId(), parts);
+        }
+
+        parts.add(part);
 
         if (log.isDebugEnabled())
-            log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']');
+            log.debug("Added invalid partition for transaction [cache=" + cacheCtx.name() + ", part=" + part +
+                ", tx=" + this + ']');
     }
 
     /** {@inheritDoc} */
@@ -1779,7 +1788,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public Set<Integer> invalidPartitions() {
+        @Override public Map<Integer, Set<Integer>> invalidPartitions() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 3c792f6..e9070a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     @GridDirectTransient
     private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
 
+    /** Transient field for calculated entry processor value. */
+    @GridDirectTransient
+    private CacheObject entryProcessorCalcVal;
+
     /** Transform closure bytes. */
     @GridToStringExclude
     private byte[] transformClosBytes;
@@ -787,6 +791,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         return expiryPlc;
     }
 
+    /**
+     * @return Entry processor calculated value.
+     */
+    public CacheObject entryProcessorCalculatedValue() {
+        return entryProcessorCalcVal;
+    }
+
+    /**
+     * @param entryProcessorCalcVal Entry processor calculated value.
+     */
+    public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) {
+        this.entryProcessorCalcVal = entryProcessorCalcVal;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e481e25..227cb34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -290,7 +290,6 @@ public class IgniteTxHandler {
                         req.version(),
                         null,
                         null,
-                        null,
                         top.topologyVersion());
 
                     try {
@@ -803,7 +802,7 @@ public class IgniteTxHandler {
                 res.nearEvicted(nearTx.evicted());
 
             if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
-                res.invalidPartitions(dhtTx.invalidPartitions());
+                res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
 
             if (req.onePhaseCommit()) {
                 assert req.last();
@@ -1154,7 +1153,7 @@ public class IgniteTxHandler {
             if (req.last())
                 tx.state(PREPARED);
 
-            res.invalidPartitions(tx.invalidPartitions());
+            res.invalidPartitionsByCacheId(tx.invalidPartitions());
 
             if (tx.empty() && req.last()) {
                 tx.rollback();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b354fed..a32e7b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -2216,8 +2216,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                         missedForLoad.add(cacheKey);
                                     }
                                     else {
-                                        assert !transform;
-                                        assert txEntry.op() != TRANSFORM;
+                                        assert !implicit() || !transform : this;
+                                        assert txEntry.op() != TRANSFORM : txEntry;
 
                                         if (retval)
                                             ret.set(cacheCtx, null, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 78b09e6..0f43b8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -651,6 +651,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException {
         ServiceConfiguration cfg = dep.configuration();
 
+        Object nodeFilter = cfg.getNodeFilter();
+
+        if (nodeFilter != null)
+            ctx.resource().injectGeneric(nodeFilter);
+
         int totalCnt = cfg.getTotalCount();
         int maxPerNodeCnt = cfg.getMaxPerNodeCount();
         String cacheName = cfg.getCacheName();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f8c4c7e..c9be652 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.mxbean.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.io.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -580,7 +581,14 @@ public abstract class IgniteUtils {
 
         m.put(ClusterTopologyCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
             @Override public IgniteException apply(IgniteCheckedException e) {
-                return new ClusterTopologyException(e.getMessage(), e);
+                ClusterTopologyException topEx = new ClusterTopologyException(e.getMessage(), e);
+
+                ClusterTopologyCheckedException checked = (ClusterTopologyCheckedException)e;
+
+                if (checked.retryReadyFuture() != null)
+                    topEx.retryReadyFuture(new IgniteFutureImpl<>(checked.retryReadyFuture()));
+
+                return topEx;
             }
         });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index f3bcab0..5c43ad9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -4085,6 +4085,20 @@ public class GridFunc {
      * @param val Value to find.
      * @return {@code True} if array contains given value.
      */
+    public static boolean contains(Integer[] arr, int val) {
+        for (Integer el : arr) {
+            if (el == val)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param arr Array.
+     * @param val Value to find.
+     * @return {@code True} if array contains given value.
+     */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public static boolean contains(long[] arr, long val) {
         for (int i = 0; i < arr.length; i++) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index b93c547..2a07879 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -99,6 +99,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
     /** Local address */
     private String locAddr;
 
+    /** Time to live. */
+    private Integer ttl;
+
     /** */
     @GridToStringExclude
     private Collection<AddressSender> addrSnds;
@@ -223,6 +226,32 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
         return locAddr;
     }
 
+    /**
+     * Set the default time-to-live for multicast packets sent out on this
+     * IP finder in order to control the scope of the multicast.
+     * <p>
+     * The TTL has to be in the range {@code  0 <= TTL <= 255}.
+     * <p>
+     * If TTL is {@code 0}, packets are not transmitted on the network,
+     * but may be delivered locally.
+     *
+     * @param ttl Time to live.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setTimeToLive(int ttl) {
+        this.ttl = ttl;
+    }
+
+    /**
+     * Set the default time-to-live for multicast packets sent out on this
+     * IP finder.
+     *
+     * @return Time to live.
+     */
+    public int getTimeToLive() {
+        return ttl;
+    }
+
     /** {@inheritDoc} */
     @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
         // If IGNITE_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from
@@ -245,6 +274,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
             throw new IgniteSpiException("Invalid number of address request attempts, " +
                 "value greater than zero is expected: " + addrReqAttempts);
 
+        if (ttl != null && (ttl < 0 || ttl > 255))
+            throw new IgniteSpiException("Time-to-live value is out of 0 <= TTL <= 255 range: " + ttl);
+
         if (F.isEmpty(getRegisteredAddresses()))
             U.warn(log, "TcpDiscoveryMulticastIpFinder has no pre-configured addresses " +
                 "(it is recommended in production to specify at least one address in " +
@@ -453,6 +485,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
                     sock.setSoTimeout(resWaitTime);
 
+                    if (ttl != null)
+                        sock.setTimeToLive(ttl);
+
                     reqPckt.setData(MSG_ADDR_REQ_DATA);
 
                     try {
@@ -722,6 +757,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
             sock.joinGroup(mcastGrp);
 
+            if (ttl != null)
+                sock.setTimeToLive(ttl);
+
             return sock;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml b/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
index 91d77cd..3e3d6e0 100644
--- a/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
+++ b/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
@@ -180,6 +180,14 @@
                     <property name="javaName" value="name"/>
                     <property name="javaType" value="java.lang.String"/>
                 </bean>
+                <bean class="org.apache.ignite.cache.CacheTypeFieldMetadata">
+                    <property name="databaseName" value="salary"/>
+                    <property name="databaseType">
+                        <util:constant static-field="java.sql.Types.INTEGER"/>
+                    </property>
+                    <property name="javaName" value="salary"/>
+                    <property name="javaType" value="java.lang.Integer"/>
+                </bean>
             </list>
         </property>
     </bean>

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
index 182d3bc..68a77dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
@@ -183,12 +183,24 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
             // No-op.
         }
 
-        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))");
-        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))");
-        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))");
-        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))");
-        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))");
-        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person_Complex (id integer not null, org_id integer not null, city_id integer not null, name varchar(50), PRIMARY KEY(id))");
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Person_Complex (id integer not null, org_id integer not null, city_id integer not null, " +
+            "name varchar(50), salary integer, PRIMARY KEY(id))");
 
         conn.commit();
 
@@ -238,7 +250,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
 
         U.closeQuiet(prnStmt);
 
-        PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name) VALUES (?, ?, ?, ?)");
+        PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)");
 
         for (int i = 0; i < PERSON_CNT; i++) {
             prnComplexStmt.setInt(1, i);
@@ -246,6 +258,11 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
             prnComplexStmt.setInt(3, i % 100);
             prnComplexStmt.setString(4, "name" + i);
 
+            if (i > 0)
+                prnComplexStmt.setInt(5, 1000 + i * 500);
+            else // Add person with null salary
+                prnComplexStmt.setNull(5, java.sql.Types.INTEGER);
+
             prnComplexStmt.addBatch();
         }
 
@@ -274,7 +291,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
 
                     assert key.getId() == val.getId();
                     assert key.getOrgId() == val.getOrgId();
-                    assert ("name"  + key.getId()).equals(val.getName());
+                    assertEquals("name"  + key.getId(), val.getName());
 
                     prnComplexKeys.add((PersonComplexKey)k);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
index eac7669..9483545 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
@@ -190,7 +190,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
                     if (rnd.nextBoolean())
                         cache.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
                     else
-                        cache.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
+                        cache.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, 1));
                 }
 
                 return null;
@@ -209,7 +209,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
                     if (rnd.nextBoolean())
                         cache.putIfAbsent(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
                     else
-                        cache.putIfAbsent(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
+                        cache.putIfAbsent(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, i));
                 }
 
                 return null;
@@ -248,7 +248,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
                         if (rnd.nextBoolean())
                             map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
                         else
-                            map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
+                            map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, 1));
                     }
 
                     IgniteCache<Object, Object> cache = jcache();
@@ -273,17 +273,17 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
                     IgniteCache<PersonKey, Person> cache = jcache();
 
                     try (Transaction tx = grid().transactions().txStart()) {
-                        cache.put(new PersonKey(1), new Person(1, rnd.nextInt(), "Name" + 1));
-                        cache.put(new PersonKey(2), new Person(2, rnd.nextInt(), "Name" + 2));
-                        cache.put(new PersonKey(3), new Person(3, rnd.nextInt(), "Name" + 3));
+                        cache.put(new PersonKey(1), new Person(1, rnd.nextInt(), "Name" + 1, 1));
+                        cache.put(new PersonKey(2), new Person(2, rnd.nextInt(), "Name" + 2, 2));
+                        cache.put(new PersonKey(3), new Person(3, rnd.nextInt(), "Name" + 3, 3));
 
                         cache.get(new PersonKey(1));
                         cache.get(new PersonKey(4));
 
                         Map<PersonKey, Person> map =  U.newHashMap(2);
 
-                        map.put(new PersonKey(5), new Person(5, rnd.nextInt(), "Name" + 5));
-                        map.put(new PersonKey(6), new Person(6, rnd.nextInt(), "Name" + 6));
+                        map.put(new PersonKey(5), new Person(5, rnd.nextInt(), "Name" + 5, 5));
+                        map.put(new PersonKey(6), new Person(6, rnd.nextInt(), "Name" + 6, 6));
 
                         cache.putAll(map);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
index 1c4b9a7..95c83b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
@@ -37,6 +37,9 @@ public class Person implements Serializable {
     /** Value for name. */
     private String name;
 
+    /** Value for salary. */
+    private Integer salary;
+
     /**
      * Empty constructor.
      */
@@ -50,11 +53,13 @@ public class Person implements Serializable {
     public Person(
         Integer id,
         Integer orgId,
-        String name
+        String name,
+        Integer salary
     ) {
         this.id = id;
         this.orgId = orgId;
         this.name = name;
+        this.salary = salary;
     }
 
     /**
@@ -111,6 +116,25 @@ public class Person implements Serializable {
         this.name = name;
     }
 
+
+    /**
+     * Gets salary.
+     *
+     * @return Value for salary.
+     */
+    public Integer getSalary() {
+        return salary;
+    }
+
+    /**
+     * Sets salary.
+     *
+     * @param salary New value for salary.
+     */
+    public void setSalary(Integer salary) {
+        this.salary = salary;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
new file mode 100644
index 0000000..e5e6d72
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.fair.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE1 = "cache1";
+
+    /** */
+    private static final String CACHE2 = "cache2";
+
+    /** */
+    private static final int GRID_CNT = 5;
+
+    /** */
+    private static final int KEY_RANGE = 1000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        if (gridName.equals(getTestGridName(GRID_CNT - 1)))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(GRID_CNT - 1);
+
+        startGrid(GRID_CNT - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOperations() throws Exception {
+        txOperations(PARTITIONED, FULL_SYNC, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheTxOperations() throws Exception {
+        txOperations(PARTITIONED, FULL_SYNC, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheTxOperationsPrimarySync() throws Exception {
+        txOperations(PARTITIONED, PRIMARY_SYNC, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testCrossCacheTxOperationsFairAffinity() throws Exception {
+        txOperations(PARTITIONED, FULL_SYNC, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheTxOperationsReplicated() throws Exception {
+        txOperations(REPLICATED, FULL_SYNC, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCrossCacheTxOperationsReplicatedPrimarySync() throws Exception {
+        txOperations(REPLICATED, PRIMARY_SYNC, true, false);
+    }
+
+    /**
+     * @param name Cache name.
+     * @param cacheMode Cache mode.
+     * @param writeSync Write synchronization mode.
+     * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name,
+        CacheMode cacheMode,
+        CacheWriteSynchronizationMode writeSync,
+        boolean fairAff) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(writeSync);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(1);
+
+        ccfg.setAffinity(fairAff ? new FairAffinityFunction() : new RendezvousAffinityFunction());
+
+        return ccfg;
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param writeSync Write synchronization mode.
+     * @param crossCacheTx If {@code true} uses cross cache transaction.
+     * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}.
+     * @throws Exception If failed.
+     */
+    private void txOperations(CacheMode cacheMode,
+        CacheWriteSynchronizationMode writeSync,
+        boolean crossCacheTx,
+        boolean fairAff) throws Exception {
+        Ignite ignite = ignite(0);
+
+        try {
+            ignite.createCache(cacheConfiguration(CACHE1, cacheMode, writeSync, fairAff));
+            ignite.createCache(cacheConfiguration(CACHE2, cacheMode, writeSync, fairAff));
+
+            txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, false);
+            txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, true);
+
+            txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, false);
+            txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, true);
+        }
+        finally {
+            ignite.destroyCache(CACHE1);
+            ignite.destroyCache(CACHE2);
+        }
+    }
+
+    /**
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @param crossCacheTx If {@code true} uses cross cache transaction.
+     * @param client If {@code true} uses client node.
+     * @throws Exception If failed.
+     */
+    private void txOperations(TransactionConcurrency concurrency,
+        TransactionIsolation isolation,
+        boolean crossCacheTx,
+        boolean client) throws Exception {
+        final Map<TestKey, TestValue> expData1 = new HashMap<>();
+        final Map<TestKey, TestValue> expData2 = new HashMap<>();
+
+        Ignite ignite = client ? ignite(GRID_CNT - 1) : ignite(0);
+
+        assertEquals(client, (boolean)ignite.configuration().isClientMode());
+
+        final List<IgniteCache<TestKey, TestValue>> caches1 = new ArrayList<>();
+        final List<IgniteCache<TestKey, TestValue>> caches2 = new ArrayList<>();
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            caches1.add(ignite(i).<TestKey, TestValue>cache(CACHE1));
+            caches2.add(ignite(i).<TestKey, TestValue>cache(CACHE2));
+        }
+
+        IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1);
+        IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2);
+
+        assertNotNull(cache1);
+        assertNotNull(cache2);
+        assertNotSame(cache1, cache2);
+
+        try {
+            Random rnd = new Random();
+
+            long seed = System.currentTimeMillis();
+
+            rnd.setSeed(seed);
+
+            log.info("Test tx operations [concurrency=" + concurrency +
+                ", isolation=" + isolation +
+                ", client=" + client +
+                ", seed=" + seed + ']');
+
+            IgniteTransactions txs = ignite.transactions();
+
+            final List<TestKey> keys = new ArrayList<>();
+
+            for (int i = 0; i < KEY_RANGE; i++)
+                keys.add(new TestKey(i));
+
+            CacheConfiguration ccfg = cache1.getConfiguration(CacheConfiguration.class);
+
+            boolean fullSync = ccfg.getWriteSynchronizationMode() == FULL_SYNC;
+            boolean optimistic = concurrency == OPTIMISTIC;
+
+            boolean checkData = fullSync && !optimistic;
+
+            for (int i = 0; i < 10_000; i++) {
+                if (i % 100 == 0)
+                    log.info("Iteration: " + i);
+
+                boolean rollback = i % 10 == 0;
+
+                try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                    cacheOperation(expData1, rnd, cache1, checkData, rollback);
+
+                    if (crossCacheTx)
+                        cacheOperation(expData2, rnd, cache2, checkData, rollback);
+
+                    if (rollback)
+                        tx.rollback();
+                    else
+                        tx.commit();
+                }
+            }
+
+            if (fullSync) {
+                checkData(caches1, keys, expData1);
+                checkData(caches2, keys, expData2);
+
+                cache1.removeAll();
+                cache2.removeAll();
+
+                checkData(caches1, keys, new HashMap<TestKey, TestValue>());
+                checkData(caches2, keys, new HashMap<TestKey, TestValue>());
+            }
+        }
+        finally {
+            cache1.removeAll();
+            cache2.removeAll();
+        }
+    }
+
+    /**
+     * @param caches Caches.
+     * @param keys Keys.
+     * @param expData Expected data.
+     */
+    private void checkData(List<IgniteCache<TestKey, TestValue>> caches,
+        List<TestKey> keys, Map<TestKey, TestValue> expData) {
+        for (IgniteCache<TestKey, TestValue> cache : caches) {
+            for (TestKey key : keys) {
+                TestValue val = cache.get(key);
+                TestValue expVal = expData.get(key);
+
+                assertEquals(expVal, val);
+            }
+        }
+    }
+
+    /**
+     * @param expData Expected cache data.
+     * @param rnd Random.
+     * @param cache Cache.
+     * @param checkData If {@code true} checks data.
+     * @param willRollback {@code True} if will rollback transaction.
+     */
+    private void cacheOperation(
+        Map<TestKey, TestValue> expData,
+        Random rnd,
+        IgniteCache<TestKey, TestValue> cache,
+        boolean checkData,
+        boolean willRollback) {
+        TestKey key = key(rnd);
+        TestValue val = new TestValue(rnd.nextLong());
+
+        switch (rnd.nextInt(8)) {
+            case 0: {
+                cache.put(key, val);
+
+                if (!willRollback)
+                    expData.put(key, val);
+
+                break;
+            }
+
+            case 1: {
+                TestValue oldVal = cache.getAndPut(key, val);
+
+                TestValue expOld = expData.get(key);
+
+                if (checkData)
+                    assertEquals(expOld, oldVal);
+
+                if (!willRollback)
+                    expData.put(key, val);
+
+                break;
+            }
+
+            case 2: {
+                boolean rmv = cache.remove(key);
+
+                if (checkData)
+                    assertEquals(expData.containsKey(key), rmv);
+
+                if (!willRollback)
+                    expData.remove(key);
+
+                break;
+            }
+
+            case 3: {
+                TestValue oldVal = cache.getAndRemove(key);
+
+                TestValue expOld = expData.get(key);
+
+                if (checkData)
+                    assertEquals(expOld, oldVal);
+
+                if (!willRollback)
+                    expData.remove(key);
+
+                break;
+            }
+
+            case 4: {
+                boolean put = cache.putIfAbsent(key, val);
+
+                boolean expPut = !expData.containsKey(key);
+
+                if (checkData)
+                    assertEquals(expPut, put);
+
+                if (expPut && !willRollback)
+                    expData.put(key, val);
+
+                break;
+            }
+
+            case 5: {
+                TestValue oldVal = cache.invoke(key, new TestEntryProcessor(val.value()));
+                TestValue expOld = expData.get(key);
+
+                if (checkData)
+                    assertEquals(expOld, oldVal);
+
+                if (!willRollback)
+                    expData.put(key, val);
+
+                break;
+            }
+
+            case 6: {
+                TestValue oldVal = cache.invoke(key, new TestEntryProcessor(null));
+                TestValue expOld = expData.get(key);
+
+                if (checkData)
+                    assertEquals(expOld, oldVal);
+
+                break;
+            }
+
+            case 7: {
+                TestValue oldVal = cache.get(key);
+                TestValue expOld = expData.get(key);
+
+                if (checkData)
+                    assertEquals(expOld, oldVal);
+
+                break;
+            }
+
+            default:
+                assert false;
+        }
+    }
+
+    /**
+     * @param rnd Random.
+     * @return Key.
+     */
+    private TestKey key(Random rnd) {
+        return new TestKey(rnd.nextInt(KEY_RANGE));
+    }
+
+    /**
+     *
+     */
+    private static class TestKey implements Serializable {
+        /** */
+        private long key;
+
+        /**
+         * @param key Key.
+         */
+        public TestKey(long key) {
+            this.key = key;
+        }
+
+        /**
+         * @return Key.
+         */
+        public long key() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey other = (TestKey)o;
+
+            return key == other.key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return (int)(key ^ (key >>> 32));
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestValue implements Serializable {
+        /** */
+        private long val;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(long val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public long value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue other = (TestValue)o;
+
+            return val == other.val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> {
+        /** */
+        private Long val;
+
+        /**
+         * @param val Value.
+         */
+        public TestEntryProcessor(@Nullable Long val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) {
+            TestValue old = e.getValue();
+
+            if (val != null)
+                e.setValue(new TestValue(val));
+
+            return old;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
new file mode 100644
index 0000000..af87a7d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class IgniteAtomicCacheEntryProcessorNodeJoinTest extends IgniteCacheEntryProcessorNodeJoinTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}