You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:22:26 UTC

[03/10] ignite git commit: ignite-6124 Merge exchanges for multiple discovery events

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index acc4dbe..a164e85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,6 +100,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @GridDirectTransient
     private transient boolean compress;
 
+    /** */
+    private AffinityTopologyVersion resTopVer;
+
+    /** */
+    @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class)
+    private Map<Integer, CacheGroupAffinityMessage> joinedNodeAff;
+
+    /** */
+    @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class)
+    private Map<Integer, CacheGroupAffinityMessage> idealAffDiff;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -128,6 +140,83 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /** {@inheritDoc} */
+    @Override void copyStateTo(GridDhtPartitionsAbstractMessage msg) {
+        super.copyStateTo(msg);
+
+        GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
+
+        cp.parts = parts;
+        cp.dupPartsData = dupPartsData;
+        cp.partsBytes = partsBytes;
+        cp.partCntrs = partCntrs;
+        cp.partCntrsBytes = partCntrsBytes;
+        cp.partHistSuppliers = partHistSuppliers;
+        cp.partHistSuppliersBytes = partHistSuppliersBytes;
+        cp.partsToReload = partsToReload;
+        cp.partsToReloadBytes = partsToReloadBytes;
+        cp.topVer = topVer;
+        cp.errs = errs;
+        cp.errsBytes = errsBytes;
+        cp.compress = compress;
+        cp.resTopVer = resTopVer;
+        cp.joinedNodeAff = joinedNodeAff;
+        cp.idealAffDiff = idealAffDiff;
+    }
+
+    /**
+     * @return Message copy.
+     */
+    GridDhtPartitionsFullMessage copy() {
+        GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
+
+        copyStateTo(cp);
+
+        return cp;
+    }
+
+    /**
+     * @param resTopVer Result topology version.
+     */
+    public void resultTopologyVersion(AffinityTopologyVersion resTopVer) {
+        this.resTopVer = resTopVer;
+    }
+
+    /**
+     * @return Result topology version.
+     */
+    public AffinityTopologyVersion resultTopologyVersion() {
+        return resTopVer;
+    }
+
+    /**
+     * @return Caches affinity for joining nodes.
+     */
+    @Nullable public Map<Integer, CacheGroupAffinityMessage> joinedNodeAffinity() {
+        return joinedNodeAff;
+    }
+
+    /**
+     * @param joinedNodeAff Caches affinity for joining nodes.
+     */
+    void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
+        this.joinedNodeAff = joinedNodeAff;
+    }
+
+    /**
+     * @return Difference with ideal affinity.
+     */
+    @Nullable public Map<Integer, CacheGroupAffinityMessage> idealAffinityDiff() {
+        return idealAffDiff;
+    }
+
+    /**
+     * @param idealAffDiff Difference with ideal affinity.
+     */
+    void idealAffinityDiff(Map<Integer, CacheGroupAffinityMessage> idealAffDiff) {
+        this.idealAffDiff = idealAffDiff;
+    }
+
+    /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;
     }
@@ -243,11 +332,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        boolean marshal = (parts != null && partsBytes == null) ||
-            (partCntrs != null && partCntrsBytes == null) ||
+        boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
+            (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) ||
             (partHistSuppliers != null && partHistSuppliersBytes == null) ||
             (partsToReload != null && partsToReloadBytes == null) ||
-            (errs != null && errsBytes == null);
+            (!F.isEmpty(errs) && errsBytes == null);
 
         if (marshal) {
             byte[] partsBytes0 = null;
@@ -256,10 +345,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             byte[] partsToReloadBytes0 = null;
             byte[] errsBytes0 = null;
 
-            if (parts != null && partsBytes == null)
+            if (!F.isEmpty(parts) && partsBytes == null)
                 partsBytes0 = U.marshal(ctx, parts);
 
-            if (partCntrs != null && partCntrsBytes == null)
+            if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null)
                 partCntrsBytes0 = U.marshal(ctx, partCntrs);
 
             if (partHistSuppliers != null && partHistSuppliersBytes == null)
@@ -268,7 +357,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             if (partsToReload != null && partsToReloadBytes == null)
                 partsToReloadBytes0 = U.marshal(ctx, partsToReload);
 
-            if (errs != null && errsBytes == null)
+            if (!F.isEmpty(errs) && errsBytes == null)
                 errsBytes0 = U.marshal(ctx, errs);
 
             if (compress) {
@@ -420,30 +509,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeMap("idealAffDiff", idealAffDiff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
+                if (!writer.writeMap("joinedNodeAff", joinedNodeAff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
                     return false;
 
                 writer.incrementState();
 
             case 11:
+                if (!writer.writeByteArray("partsBytes", partsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeMessage("resTopVer", resTopVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -482,7 +589,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 7:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                idealAffDiff = reader.readMap("idealAffDiff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -490,7 +597,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 8:
-                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
+                joinedNodeAff = reader.readMap("joinedNodeAff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -498,7 +605,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 9:
-                partsBytes = reader.readByteArray("partsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -506,7 +613,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 10:
-                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -514,6 +621,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 11:
+                partsBytes = reader.readByteArray("partsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                resTopVer = reader.readMessage("resTopVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -533,7 +664,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 15;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index b4d25c4..bc7d314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.HashMap;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.io.Externalizable;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -88,6 +90,16 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @GridDirectTransient
     private transient boolean compress;
 
+    /** */
+    @GridDirectCollection(Integer.class)
+    private Collection<Integer> grpsAffRequest;
+
+    /**
+     * Exchange finish message, sent to new coordinator when it tries to
+     * restore state after previous coordinator failed during exchange.
+     */
+    private GridDhtPartitionsFullMessage finishMsg;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -111,6 +123,34 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         this.compress = compress;
     }
 
+    /**
+     * @param finishMsg Exchange finish message (used to restore exchange state on new coordinator).
+     */
+    void finishMessage(GridDhtPartitionsFullMessage finishMsg) {
+        this.finishMsg = finishMsg;
+    }
+
+    /**
+     * @return Exchange finish message (used to restore exchange state on new coordinator).
+     */
+    GridDhtPartitionsFullMessage finishMessage() {
+        return finishMsg;
+    }
+
+    /**
+     * @param grpsAffRequest Cache groups to get affinity for (affinity is requested when node joins cluster).
+     */
+    void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) {
+        this.grpsAffRequest = grpsAffRequest;
+    }
+
+    /**
+     * @return Cache groups to get affinity for (affinity is requested when node joins cluster).
+     */
+    @Nullable public Collection<Integer> cacheGroupsAffinityRequest() {
+        return grpsAffRequest;
+    }
+
     /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;
@@ -374,18 +414,30 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeMessage("finishMsg", finishMsg))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -432,7 +484,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 8:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                finishMsg = reader.readMessage("finishMsg");
 
                 if (!reader.isLastRead())
                     return false;
@@ -440,7 +492,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 9:
-                partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+                grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -448,6 +500,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 10:
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -467,7 +535,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 13;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 4b80ee0..6317fbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -33,6 +33,9 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private GridDhtPartitionExchangeId restoreExchId;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -47,6 +50,28 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
         super(id, null);
     }
 
+    /**
+     * @param msgExchId Exchange ID for message.
+     * @param restoreExchId Initial exchange ID for current exchange.
+     * @return Message.
+     */
+    static GridDhtPartitionsSingleRequest restoreStateRequest(GridDhtPartitionExchangeId msgExchId, GridDhtPartitionExchangeId restoreExchId) {
+        GridDhtPartitionsSingleRequest msg = new GridDhtPartitionsSingleRequest(msgExchId);
+
+        msg.restoreState(true);
+
+        msg.restoreExchId = restoreExchId;
+
+        return msg;
+    }
+
+    /**
+     * @return ID of current exchange on new coordinator.
+     */
+    GridDhtPartitionExchangeId restoreExchangeId() {
+        return restoreExchId;
+    }
+
     /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;
@@ -71,6 +96,15 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
             writer.onHeaderWritten();
         }
 
+        switch (writer.state()) {
+            case 5:
+                if (!writer.writeMessage("restoreExchId", restoreExchId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
         return true;
     }
 
@@ -84,6 +118,17 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
         if (!super.readFrom(buf, reader))
             return false;
 
+        switch (reader.state()) {
+            case 5:
+                restoreExchId = reader.readMessage("restoreExchId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
         return reader.afterMessageRead(GridDhtPartitionsSingleRequest.class);
     }
 
@@ -94,7 +139,7 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 305da92..c8d1041 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -29,7 +29,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -173,29 +172,31 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
-        supplier.onTopologyChanged(lastFut.topologyVersion());
+        supplier.onTopologyChanged(lastFut.initialVersion());
 
         demander.onTopologyChanged(lastFut);
     }
 
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) {
+        assert exchFut == null || exchFut.isDone();
+
         // No assignments for disabled preloader.
         GridDhtPartitionTopology top = grp.topology();
 
         if (!grp.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
+            return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion());
 
         int partCnt = grp.affinity().partitions();
 
-        assert exchFut == null || exchFut.topologyVersion().equals(top.topologyVersion()) :
-            "Topology version mismatch [exchId=" + exchId +
-                ", grp=" + grp.name() +
-                ", topVer=" + top.topologyVersion() + ']';
+        AffinityTopologyVersion topVer = top.readyTopologyVersion();
 
-        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
+        assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) :
+            "Topology version mismatch [exchId=" + exchId +
+            ", grp=" + grp.name() +
+            ", topVer=" + top.readyTopologyVersion() + ']';
 
-        AffinityTopologyVersion topVer = assigns.topologyVersion();
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, topVer);
 
         AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
@@ -242,7 +243,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     if (msg == null) {
                         assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage(
                             top.updateSequence(),
-                            exchId.topologyVersion(),
+                            assigns.topologyVersion(),
                             grp.groupId()));
                     }
 
@@ -306,12 +307,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                         GridDhtPartitionDemandMessage msg = assigns.get(n);
 
-                        if (msg == null) {
-                            assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                                top.updateSequence(),
-                                exchId.topologyVersion(),
-                                grp.groupId()));
-                        }
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            assigns.topologyVersion(),
+                            grp.groupId()));
+                    }
 
                         msg.addPartition(p, false);
                     }
@@ -612,8 +613,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                             }
                         }
                         finally {
-                            if (!partsToEvict.isEmpty())
-                                locked = true;
+                            if (!partsToEvict.isEmpty()) {
+                                if (ctx.kernalContext().isStopping()) {
+                                    partsToEvict.clear();
+
+                                    locked = false;
+                                }
+                                else
+                                    locked = true;
+                            }
                             else {
                                 boolean res = partsEvictOwning.compareAndSet(1, 0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index 9db80ae..dc2fbf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -35,6 +35,13 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
     private Map<Integer, Map<Integer, T2<Long, Long>>> map;
 
     /**
+     * @return {@code True} if map is empty.
+     */
+    public synchronized boolean empty() {
+        return map == null || map.isEmpty();
+    }
+
+    /**
      * @param cacheId Cache ID.
      * @param cntrMap Counters map.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
new file mode 100644
index 0000000..42ce9b9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*;
+
+/**
+ *
+ */
+public class InitNewCoordinatorFuture extends GridCompoundFuture {
+    /** */
+    private GridDhtPartitionsFullMessage fullMsg;
+
+    /** */
+    private Set<UUID> awaited = new HashSet<>();
+
+    /** */
+    private Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = new HashMap<>();
+
+    /** */
+    private Map<UUID, GridDhtPartitionsSingleMessage> joinExchMsgs;
+
+    /** */
+    private GridFutureAdapter restoreStateFut;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private AffinityTopologyVersion initTopVer;
+
+    /** */
+    private Map<UUID, GridDhtPartitionExchangeId> joinedNodes;
+
+    /** */
+    private boolean restoreState;
+
+    /**
+     * @param cctx Context.
+     */
+    InitNewCoordinatorFuture(GridCacheSharedContext cctx) {
+        this.log = cctx.logger(getClass());
+    }
+
+    /**
+     * @param exchFut Current future.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void init(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
+        initTopVer = exchFut.initialVersion();
+
+        GridCacheSharedContext cctx = exchFut.sharedContext();
+
+        restoreState = exchangeProtocolVersion(exchFut.context().events().discoveryCache().minimumNodeVersion()) > 1;
+
+        boolean newAff = exchFut.localJoinExchange();
+
+        IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(exchFut, newAff);
+
+        if (fut != null)
+            add(fut);
+
+        if (restoreState) {
+            DiscoCache curDiscoCache = cctx.discovery().discoCache();
+
+            DiscoCache discoCache = exchFut.events().discoveryCache();
+
+            List<ClusterNode> nodes = new ArrayList<>();
+
+            synchronized (this) {
+                for (ClusterNode node : discoCache.allNodes()) {
+                    if (!node.isLocal() && cctx.discovery().alive(node)) {
+                        awaited.add(node.id());
+
+                        nodes.add(node);
+                    }
+                }
+
+                if (exchFut.context().mergeExchanges() && !curDiscoCache.version().equals(discoCache.version())) {
+                    for (ClusterNode node : curDiscoCache.allNodes()) {
+                        if (discoCache.node(node.id()) == null) {
+                            if (exchangeProtocolVersion(node.version()) == 1)
+                                break;
+
+                            awaited.add(node.id());
+
+                            nodes.add(node);
+
+                            if (joinedNodes == null)
+                                joinedNodes = new HashMap<>();
+
+                            GridDhtPartitionExchangeId exchId = new GridDhtPartitionExchangeId(node.id(),
+                                EVT_NODE_JOINED,
+                                new AffinityTopologyVersion(node.order()));
+
+                            joinedNodes.put(node.id(), exchId);
+                        }
+                    }
+                }
+
+                if (joinedNodes == null)
+                    joinedNodes = Collections.emptyMap();
+
+                if (!awaited.isEmpty()) {
+                    restoreStateFut = new GridFutureAdapter();
+
+                    add(restoreStateFut);
+                }
+            }
+
+            log.info("Try restore exchange result [allNodes=" + awaited +
+                ", joined=" + joinedNodes.keySet() +  ']');
+
+            if (!nodes.isEmpty()) {
+                GridDhtPartitionsSingleRequest req = GridDhtPartitionsSingleRequest.restoreStateRequest(exchFut.exchangeId(),
+                    exchFut.exchangeId());
+
+                for (ClusterNode node : nodes) {
+                    try {
+                        GridDhtPartitionsSingleRequest sndReq = req;
+
+                        if (joinedNodes.containsKey(node.id())) {
+                            sndReq = GridDhtPartitionsSingleRequest.restoreStateRequest(
+                                joinedNodes.get(node.id()),
+                                exchFut.exchangeId());
+                        }
+
+                        cctx.io().send(node, sndReq, GridIoPolicy.SYSTEM_POOL);
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send partitions request, node failed: " + node);
+
+                        onNodeLeft(node.id());
+                    }
+                }
+            }
+        }
+
+        markInitialized();
+    }
+
+    boolean restoreState() {
+        return restoreState;
+    }
+
+    /**
+     * @return Received messages.
+     */
+    Map<ClusterNode, GridDhtPartitionsSingleMessage> messages() {
+        return msgs;
+    }
+
+    /**
+     * @return Full message is some of nodes received it from previous coordinator.
+     */
+    GridDhtPartitionsFullMessage fullMessage() {
+        return fullMsg;
+    }
+
+    /**
+     * @param node Node.
+     * @param msg Message.
+     */
+    public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+        log.info("Init new coordinator, received response [node=" + node.id() +
+            ", fullMsg=" + (msg.finishMessage() != null) +
+            ", affReq=" + !F.isEmpty(msg.cacheGroupsAffinityRequest()) + ']');
+
+        assert msg.restoreState() : msg;
+
+        boolean done = false;
+
+        synchronized (this) {
+            if (awaited.remove(node.id())) {
+                GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
+
+                if (fullMsg0 != null) {
+                    assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
+
+                    fullMsg  = fullMsg0;
+                }
+                else
+                    msgs.put(node, msg);
+
+                done = awaited.isEmpty();
+            }
+
+            if (done)
+                onAllReceived();
+        }
+
+        if (done)
+            restoreStateFut.onDone();
+    }
+
+    /**
+     *
+     */
+    private void onAllReceived() {
+        if (fullMsg != null) {
+            AffinityTopologyVersion resVer = fullMsg.resultTopologyVersion();
+
+            for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) {
+                Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
+
+                GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
+
+                if (msgVer != null) {
+                    assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
+
+                    log.info("Process joined node message [resVer=" + resVer +
+                        ", initTopVer=" + initTopVer +
+                        ", msgVer=" + msgVer.topologyVersion() + ']');
+
+                    if (msgVer.topologyVersion().compareTo(resVer) > 0)
+                        it.remove();
+                    else
+                        e.getValue().exchangeId(msgVer);
+                }
+            }
+        }
+        else {
+            for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) {
+                Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
+
+                GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
+
+                if (msgVer != null) {
+                    it.remove();
+
+                    assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
+
+                    log.info("Process joined node message [initTopVer=" + initTopVer +
+                        ", msgVer=" + msgVer.topologyVersion() + ']');
+
+                    if (joinExchMsgs == null)
+                        joinExchMsgs = new HashMap<>();
+
+                    e.getValue().exchangeId(msgVer);
+
+                    joinExchMsgs.put(e.getKey().id(), e.getValue());
+                }
+            }
+
+        }
+    }
+
+    @Nullable GridDhtPartitionsSingleMessage joinExchangeMessage(UUID nodeId) {
+        return joinExchMsgs != null ? joinExchMsgs.get(nodeId) : null;
+    }
+
+    /**
+     * @param nodeId Failed node ID.
+     */
+    public void onNodeLeft(UUID nodeId) {
+        log.info("Init new coordinator, node left [node=" + nodeId + ']');
+
+        boolean done;
+
+        synchronized (this) {
+            done = awaited.remove(nodeId) && awaited.isEmpty();
+        }
+
+        if (done)
+            restoreStateFut.onDone();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
index 9a76a8e..7e473be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
@@ -35,6 +35,11 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork
         this.exchId = exchId;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return true;
+    }
+
     /**
      * @return Exchange ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index b27591e..428355c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -323,7 +323,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @return Near entries.
      */
     public Set<Cache.Entry<K, V>> nearEntries() {
-        final AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+        final AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
 
         return super.entrySet(new CacheEntryPredicateAdapter() {
             @Override public boolean apply(GridCacheEntryEx entry) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index a49812e..9d9c682 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -333,7 +333,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             remapKeys.add(key);
                     }
 
-                    AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
+                    AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion();
 
                     assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
                         "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -626,7 +626,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     return true;
                 }
                 else {
-                    boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+                    boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().lastTopologyChangeVersion());
 
                     // Entry not found, do not continue search if topology did not change and there is no store.
                     return !cctx.readThroughConfigured() && (topStable || partitionOwned(part));

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index db030b0..bb71337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -787,7 +787,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
 
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
-                if (fut.topologyVersion().equals(topVer)){
+                if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)){
                     Throwable err = fut.validateCache(cctx, recovery, read, null, keys);
 
                     if (err != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 1995e2e..69d0940 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -554,6 +554,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             tx.subjectId(),
             tx.taskNameHash(),
             m.clientFirst(),
+            txNodes.size() == 1,
             tx.activeCachesDeploymentEnabled());
 
         for (IgniteTxEntry txEntry : writes) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index b763f02..2c23a7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -540,6 +540,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     tx.subjectId(),
                     tx.taskNameHash(),
                     m.clientFirst(),
+                    true,
                     tx.activeCachesDeploymentEnabled());
 
                 for (IgniteTxEntry txEntry : m.entries()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index db065a7..d017d7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -211,6 +211,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             tx.subjectId(),
             tx.taskNameHash(),
             false,
+            true,
             tx.activeCachesDeploymentEnabled());
 
         for (IgniteTxEntry txEntry : writes) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 875f397..7deceb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -54,6 +54,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** */
     private static final int EXPLICIT_LOCK_FLAG_MASK = 0x08;
 
+    /** */
+    private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10;
+
     /** Future ID. */
     private IgniteUuid futId;
 
@@ -97,6 +100,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
      * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node.
+     * @param {@code True} if it is safe for first client request to wait for topology future.
      * @param addDepInfo Deployment info flag.
      */
     public GridNearTxPrepareRequest(
@@ -116,6 +120,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         @Nullable UUID subjId,
         int taskNameHash,
         boolean firstClientReq,
+        boolean allowWaitTopFut,
         boolean addDepInfo
     ) {
         super(tx,
@@ -140,6 +145,15 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         setFlag(implicitSingle, IMPLICIT_SINGLE_FLAG_MASK);
         setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
         setFlag(firstClientReq, FIRST_CLIENT_REQ_FLAG_MASK);
+        setFlag(allowWaitTopFut, ALLOW_WAIT_TOP_FUT_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if it is safe for first client request to wait for topology future
+     *      completion.
+     */
+    public boolean allowWaitTopologyFuture() {
+        return isFlag(ALLOW_WAIT_TOP_FUT_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 3ef9e61..e32ecc6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -721,7 +721,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        DiscoveryEvent discoEvt = fut.discoveryEvent();
+        DiscoveryEvent discoEvt = fut.firstEvent();
 
         boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
 
@@ -737,7 +737,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         if (cctx.kernalContext().query().moduleEnabled()) {
             for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
-                if (cacheCtx.startTopologyVersion().equals(fut.topologyVersion()) &&
+                if (cacheCtx.startTopologyVersion().equals(fut.initialVersion()) &&
                     !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
                     final int cacheId = cacheCtx.cacheId();
 
@@ -1558,6 +1558,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     long partMetaId = pageMem.partitionMetaPageId(grpId, i);
                     long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
+
                     try {
                         long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
 
@@ -1622,6 +1623,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      * @param cacheCtx Cache context to apply an update.
      * @param dataEntry Data entry to apply.
+     * @throws IgniteCheckedException If failed to restore.
      */
     private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
         int partId = dataEntry.partitionId();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 87eed82..023c03c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -668,7 +668,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
             this.cctx = cctx;
             this.part = part;
 
-            nodes = fallbacks(cctx.discovery().topologyVersionEx());
+            nodes = fallbacks(cctx.shared().exchange().readyAffinityVersion());
 
             if (F.isEmpty(nodes))
                 throw new ClusterTopologyException("Failed to execute the query " +
@@ -826,7 +826,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                         if (retryFut != null)
                             retryFut.get();
 
-                        nodes = fallbacks(unreservedTopVer == null ? cctx.discovery().topologyVersionEx() : unreservedTopVer);
+                        nodes = fallbacks(unreservedTopVer == null ? cctx.shared().exchange().readyAffinityVersion() : unreservedTopVer);
 
                         unreservedTopVer = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eccb9c1..4d85db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -478,7 +478,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                     return topVer;
             }
 
-            return cctx.exchange().topologyVersion();
+            return cctx.exchange().readyAffinityVersion();
         }
 
         return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index bb85a1e..beeb184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
@@ -74,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
@@ -109,10 +111,10 @@ public class IgniteTxHandler {
     private GridCacheSharedContext<?, ?> ctx;
 
     /**
-     * @param nearNodeId Node ID.
+     * @param nearNodeId Sender node ID.
      * @param req Request.
      */
-    private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+    private void processNearTxPrepareRequest(UUID nearNodeId, GridNearTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
                 ", node=" + nearNodeId + ']');
@@ -130,7 +132,29 @@ public class IgniteTxHandler {
             return;
         }
 
-        IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareNearTx(nearNode, req);
+        processNearTxPrepareRequest0(nearNode, req);
+    }
+
+    /**
+     * @param nearNode Sender node.
+     * @param req Request.
+     */
+    private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut;
+
+        if (req.firstClientRequest() && req.allowWaitTopologyFuture()) {
+            for (;;) {
+                if (waitForExchangeFuture(nearNode, req))
+                    return;
+
+                fut = prepareNearTx(nearNode, req);
+
+                if (fut != null)
+                    break;
+            }
+        }
+        else
+            fut = prepareNearTx(nearNode, req);
 
         assert req.txState() != null || fut == null || fut.error() != null ||
             (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
@@ -303,9 +327,9 @@ public class IgniteTxHandler {
     /**
      * @param nearNode Node that initiated transaction.
      * @param req Near prepare request.
-     * @return Prepare future.
+     * @return Prepare future or {@code null} if need retry operation.
      */
-    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
+    @Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
         final ClusterNode nearNode,
         final GridNearTxPrepareRequest req
     ) {
@@ -348,55 +372,90 @@ public class IgniteTxHandler {
                 top = firstEntry.context().topology();
 
                 top.readLock();
+
+                if (req.allowWaitTopologyFuture()) {
+                    GridDhtTopologyFuture topFut = top.topologyVersionFuture();
+
+                    if (!topFut.isDone()) {
+                        top.readUnlock();
+
+                        return null;
+                    }
+                }
             }
 
             try {
-                if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) {
-                    if (txPrepareMsgLog.isDebugEnabled()) {
-                        txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
-                            "txId=" + req.version() +
-                            ", node=" + nearNode.id() +
-                            ", reqTopVer=" + req.topologyVersion() +
-                            ", locTopVer=" + top.topologyVersion() +
-                            ", req=" + req + ']');
-                    }
+                if (top != null ) {
+                    boolean retry = false;
 
-                    GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
-                        req.partition(),
-                        req.version(),
-                        req.futureId(),
-                        req.miniId(),
-                        req.version(),
-                        req.version(),
-                        null,
-                        null,
-                        top.topologyVersion(),
-                        req.onePhaseCommit(),
-                        req.deployInfo() != null);
+                    GridDhtTopologyFuture topFut = top.topologyVersionFuture();
 
-                    try {
-                        ctx.io().send(nearNode, res, req.policy());
+                    if (!req.allowWaitTopologyFuture() && !topFut.isDone()) {
+                        retry = true;
 
                         if (txPrepareMsgLog.isDebugEnabled()) {
-                            txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
-                                ", node=" + nearNode.id() + ']');
+                            txPrepareMsgLog.debug("Topology change is in progress, need remap transaction [" +
+                                "txId=" + req.version() +
+                                ", node=" + nearNode.id() +
+                                ", reqTopVer=" + req.topologyVersion() +
+                                ", locTopVer=" + top.readyTopologyVersion() +
+                                ", req=" + req + ']');
                         }
                     }
-                    catch (ClusterTopologyCheckedException ignored) {
+
+                    if (!retry && needRemap(req.topologyVersion(), top.readyTopologyVersion(), req)) {
+                        retry = true;
+
                         if (txPrepareMsgLog.isDebugEnabled()) {
-                            txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" +
+                            txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
                                 "txId=" + req.version() +
-                                ", node=" + nearNode.id() + ']');
+                                ", node=" + nearNode.id() +
+                                ", reqTopVer=" + req.topologyVersion() +
+                                ", locTopVer=" + top.readyTopologyVersion() +
+                                ", req=" + req + ']');
                         }
                     }
-                    catch (IgniteCheckedException e) {
-                        U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " +
-                            "[txId=" + req.version() +
-                            ", node=" + nearNode.id() +
-                            ", req=" + req + ']', e);
+
+                    if (retry) {
+                        GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+                            req.partition(),
+                            req.version(),
+                            req.futureId(),
+                            req.miniId(),
+                            req.version(),
+                            req.version(),
+                            null,
+                            null,
+                            top.lastTopologyChangeVersion(),
+                            req.onePhaseCommit(),
+                            req.deployInfo() != null);
+
+                        try {
+                            ctx.io().send(nearNode, res, req.policy());
+
+                            if (txPrepareMsgLog.isDebugEnabled()) {
+                                txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
+                                    ", node=" + nearNode.id() + ']');
+                            }
+                        }
+                        catch (ClusterTopologyCheckedException ignored) {
+                            if (txPrepareMsgLog.isDebugEnabled()) {
+                                txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" +
+                                    "txId=" + req.version() +
+                                    ", node=" + nearNode.id() + ']');
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " +
+                                "[txId=" + req.version() +
+                                ", node=" + nearNode.id() +
+                                ", req=" + req + ']', e);
+                        }
+
+                        return new GridFinishedFuture<>(res);
                     }
 
-                    return new GridFinishedFuture<>(res);
+                    assert topFut.isDone();
                 }
 
                 tx = new GridDhtTxLocal(
@@ -499,6 +558,53 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param node Sender node.
+     * @param req Request.
+     * @return {@code True} if update will be retried from future listener.
+     */
+    private boolean waitForExchangeFuture(final ClusterNode node, final GridNearTxPrepareRequest req) {
+        assert req.firstClientRequest() : req;
+
+        GridDhtTopologyFuture topFut = ctx.exchange().lastTopologyFuture();
+
+        if (!topFut.isDone()) {
+            Thread curThread = Thread.currentThread();
+
+            if (curThread instanceof IgniteThread) {
+                final IgniteThread thread = (IgniteThread)curThread;
+
+                if (thread.cachePoolThread()) {
+                    topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() {
+                                @Override public void run() {
+                                    try {
+                                        processNearTxPrepareRequest0(node, req);
+                                    }
+                                    finally {
+                                        ctx.io().onMessageProcessed(req);
+                                    }
+                                }
+                            });
+                        }
+                    });
+
+                    return true;
+                }
+            }
+
+            try {
+                topFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Topology future failed: " + e, e);
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * @param expVer Expected topology version.
      * @param curVer Current topology version.
      * @param req Request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 01207e3..2ecea07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -69,6 +69,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.LoadBalancerResource;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -502,7 +503,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             final String cacheName = F.first(cacheNames);
 
-            final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+            final AffinityTopologyVersion mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
             final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
 
             if (node == null)
@@ -542,7 +543,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             final String cacheName = F.first(cacheNames);
 
-            final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+            final AffinityTopologyVersion mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
             final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
 
             if (node == null)
@@ -779,13 +780,22 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param c Closure to execute.
-     * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
-     * @return Future.
-     * @throws IgniteCheckedException Thrown in case of any errors.
+     * @param thread Thread.
+     * @param c Closure.
      */
-    private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException {
-        return runLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL);
+    public void runLocalWithThreadPolicy(IgniteThread thread, Runnable c) {
+        assert thread.stripe() >= 0 || thread.policy() != GridIoPolicy.UNDEFINED : thread;
+
+        if (thread.stripe() >= 0)
+            ctx.getStripedExecutorService().execute(thread.stripe(), c);
+        else {
+            try {
+                ctx.pools().poolForPolicy(thread.policy()).execute(c);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to get pool for policy: " + thread.policy(), e);
+            }
+        }
     }
 
     /**
@@ -852,8 +862,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Executes closure on system pool. Companion to {@link #runLocal(Runnable, boolean)} but
-     * in case of rejected execution re-runs the closure in the current thread (blocking).
+     * Executes closure on system pool. In case of rejected execution re-runs the closure in the current
+     * thread (blocking).
      *
      * @param c Closure to execute.
      * @return Future.
@@ -863,8 +873,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
-     * the closure in the current thread (blocking).
+     * In case of rejected execution re-runs the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
      * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
@@ -875,8 +884,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
-     * the closure in the current thread (blocking).
+     * In case of rejected execution re-runs the closure in the current thread (blocking).
      *
      * @param c Closure to execute.
      * @param plc Policy to choose executor pool.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 13a889c..3ad75cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -501,7 +501,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
             log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
                 ", topVer=" + topVer +
                 ", client=" + ctx.clientNode() +
-                ", daemon" + ctx.isDaemon() + "]");
+                ", daemon=" + ctx.isDaemon() + "]");
         }
 
         IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 84d536f..8b984c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -321,8 +321,10 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         try {
             GridCacheAdapter cache = ctx.cache().internalCache(req.cacheName());
 
-            if (cache == null)
-                throw new IgniteCheckedException("Cache not created or already destroyed.");
+            if (cache == null) {
+                throw new IgniteCheckedException("Cache not created or already destroyed: " +
+                    req.cacheName());
+            }
 
             GridCacheContext cctx = cache.context();
 
@@ -336,18 +338,33 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             GridDhtTopologyFuture topWaitFut = null;
 
             try {
-                GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
+                Exception remapErr = null;
+
+                AffinityTopologyVersion streamerFutTopVer = null;
+
+                if (!allowOverwrite) {
+                    GridDhtTopologyFuture topFut = cctx.topologyVersionFuture();
 
-                AffinityTopologyVersion topVer = fut.topologyVersion();
+                    AffinityTopologyVersion topVer = topFut.isDone() ? topFut.topologyVersion() :
+                        topFut.initialVersion();
+
+                    if (topVer.compareTo(req.topologyVersion()) > 0) {
+                        remapErr = new ClusterTopologyCheckedException("DataStreamer will retry " +
+                            "data transfer at stable topology [reqTop=" + req.topologyVersion() +
+                            ", topVer=" + topFut.initialVersion() + ", node=remote]");
+                    }
+                    else if (!topFut.isDone())
+                        topWaitFut = topFut;
+                    else
+                        streamerFutTopVer = topFut.topologyVersion();
+                }
 
-                if (!allowOverwrite && !topVer.equals(req.topologyVersion())) {
-                    Exception err = new ClusterTopologyCheckedException(
-                        "DataStreamer will retry data transfer at stable topology " +
-                            "[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]");
+                if (remapErr != null) {
+                    sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment());
 
-                    sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+                    return;
                 }
-                else if (allowOverwrite || fut.isDone()) {
+                else if (topWaitFut == null) {
                     job = new DataStreamerUpdateJob(ctx,
                         log,
                         req.cacheName(),
@@ -357,10 +374,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
                         req.keepBinary(),
                         updater);
 
-                    waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
+                    waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(streamerFutTopVer);
                 }
-                else
-                    topWaitFut = fut;
             }
             finally {
                 if (!allowOverwrite)
@@ -378,16 +393,14 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
                 return;
             }
 
-            if (job != null) {
-                try {
-                    job.call();
+            try {
+                job.call();
 
-                    sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
-                }
-                finally {
-                    if (waitFut != null)
-                        waitFut.onDone();
-                }
+                sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
+            }
+            finally {
+                if (waitFut != null)
+                    waitFut.onDone();
             }
         }
         catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index eaced66..c11288c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -771,9 +771,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             try {
-                AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ?
-                        ctx.cache().context().exchange().readyAffinityVersion() :
-                        cctx.topology().topologyVersion();
+                AffinityTopologyVersion topVer;
+
+                if (!cctx.isLocal())
+                    topVer = ctx.cache().context().exchange().lastTopologyFuture().get();
+                else
+                    topVer = ctx.cache().context().exchange().readyAffinityVersion();
 
                 for (DataStreamerEntry entry : entries) {
                     List<ClusterNode> nodes;
@@ -1536,23 +1539,36 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             try {
                 GridCacheContext cctx = ctx.cache().internalCache(cacheName).context();
 
-                final boolean allowOverwrite = allowOverwrite();
-                final boolean loc = cctx.isLocal();
+                final boolean lockTop = !cctx.isLocal() && !allowOverwrite();
+
+                GridDhtTopologyFuture topWaitFut = null;
 
-                if (!loc && !allowOverwrite)
+                if (lockTop)
                     cctx.topology().readLock();
 
                 try {
-                    GridDhtTopologyFuture fut = loc ? null : cctx.topologyVersionFuture();
+                    AffinityTopologyVersion streamerFutTopVer = null;
 
-                    AffinityTopologyVersion topVer = loc ? reqTopVer : fut.topologyVersion();
+                    if (lockTop) {
+                        GridDhtTopologyFuture topFut = cctx.topologyVersionFuture();
 
-                    if (!allowOverwrite && !topVer.equals(reqTopVer)) {
-                        curFut.onDone(new IgniteCheckedException(
-                            "DataStreamer will retry data transfer at stable topology. " +
-                                "[reqTop=" + reqTopVer + " ,topVer=" + topVer + ", node=local]"));
+                        AffinityTopologyVersion topVer = topFut.isDone() ? topFut.topologyVersion() :
+                            topFut.initialVersion();
+
+                        if (topVer.compareTo(reqTopVer) > 0) {
+                            curFut.onDone(new IgniteCheckedException("DataStreamer will retry data transfer " +
+                                "at stable topology. reqTop=" + reqTopVer + ", topVer=" + topFut.initialVersion() +
+                                ", node=local]"));
+
+                            return;
+                        }
+                        else if (!topFut.isDone())
+                            topWaitFut = topFut;
+                        else
+                            streamerFutTopVer = topFut.topologyVersion();
                     }
-                    else if (loc || allowOverwrite || fut.isDone()) {
+
+                    if (topWaitFut == null) {
                         IgniteInternalFuture<Object> callFut = ctx.closure().callLocalSafe(
                             new DataStreamerUpdateJob(
                                 ctx,
@@ -1567,9 +1583,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                         locFuts.add(callFut);
 
-                        final GridFutureAdapter waitFut = (loc || allowOverwrite) ?
-                            null :
-                            cctx.mvcc().addDataStreamerFuture(topVer);
+                        final GridFutureAdapter waitFut =
+                            lockTop ? cctx.mvcc().addDataStreamerFuture(streamerFutTopVer) : null;
 
                         callFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
                             @Override public void apply(IgniteInternalFuture<Object> t) {
@@ -1590,18 +1605,20 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                             }
                         });
                     }
-                    else {
-                        fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
-                                localUpdate(entries, reqTopVer, curFut, plc);
-                            }
-                        });
-                    }
                 }
                 finally {
-                    if (!loc && !allowOverwrite)
+                    if (lockTop)
                         cctx.topology().readUnlock();
                 }
+
+                if (topWaitFut != null) {
+                    // Need call 'listen' after topology read lock is released.
+                    topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+                            localUpdate(entries, reqTopVer, curFut, plc);
+                        }
+                    });
+                }
             }
             catch (Throwable ex) {
                 curFut.onDone(new IgniteCheckedException("DataStreamer data handling failed.", ex));

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 7d71a9e..fba0a4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -209,7 +209,8 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
 
                 GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
 
-                AffinityTopologyVersion topVer = discoMgr.topologyVersionEx();
+                AffinityTopologyVersion topVer =
+                    platformCtx.kernalContext().cache().context().exchange().lastTopologyFuture().get();
 
                 int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
index f97f931..c16a7ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
@@ -39,6 +39,11 @@ public class SchemaExchangeWorkerTask implements CachePartitionExchangeWorkerTas
         this.msg = msg;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return false;
+    }
+
     /**
      * @return Message.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
index 79fbfcd..668a2c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
@@ -39,6 +39,11 @@ public class SchemaNodeLeaveExchangeWorkerTask implements CachePartitionExchange
         this.node = node;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return true;
+    }
+
     /**
      * @return Node.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index ec95001..74fe57d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -863,8 +863,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         assert affCacheIds != null;
                         retry = true;
 
-                        mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.discovery().topologyVersionEx());
-                        affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
+                        mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.cache().context().exchange().readyAffinityVersion());
+                        affFut = ctx.cache().context().exchange().lastTopologyFuture();
 
                         if (affFut != null && !affFut.isDone()) {
                             waitForAffTop = true;
@@ -900,9 +900,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
                             case FAILOVER: {
                                 if (affCacheIds != null) {
-                                    mapTopVer = ctx.discovery().topologyVersionEx();
+                                    mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
 
-                                    affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
+                                    affFut = ctx.cache().context().exchange().lastTopologyFuture();
                                 }
 
                                 if (affFut != null && !affFut.isDone()) {