You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/09 09:27:25 UTC

[15/17] ignite git commit: ignite-4154

ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d4987a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d4987a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d4987a9

Branch: refs/heads/ignite-4154
Commit: 5d4987a9387fa2906cf8608bbbab0dc114432eb7
Parents: dc92038
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 9 10:59:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 9 10:59:29 2016 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  62 +--
 .../affinity/GridAffinityProcessor.java         |  81 ++++
 .../cache/CacheAffinitySharedManager.java       |  27 +-
 .../cache/DynamicCacheChangeBatch.java          |   2 +-
 .../GridCachePartitionExchangeManager.java      | 275 ++++++++++---
 .../dht/GridClientPartitionTopology.java        |  33 +-
 .../dht/GridDhtPartitionTopology.java           |   3 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  27 +-
 .../dht/preloader/GridDhtPartitionFullMap.java  |  18 +
 .../dht/preloader/GridDhtPartitionMap2.java     |  62 ++-
 .../GridDhtPartitionsExchangeFuture.java        |  92 +----
 .../preloader/GridDhtPartitionsFullMessage.java |  89 ++++-
 .../GridDhtPartitionsSingleMessage.java         |  68 +++-
 .../continuous/GridContinuousProcessor.java     |   4 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  26 +-
 ...CacheExchangeMessageDuplicatedStateTest.java | 386 +++++++++++++++++++
 .../GridCacheSyncReplicatedPreloadSelfTest.java |   3 -
 .../IgniteCacheSyncRebalanceModeSelfTest.java   |   4 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 ++++++-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 21 files changed, 1138 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 9166b31..a388c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -108,7 +108,7 @@ public class GridAffinityAssignmentCache {
     private final AtomicInteger fullHistSize = new AtomicInteger();
 
     /** */
-    private final SimilarAffinityKey similarAffKey;
+    private final Object similarAffKey;
 
     /**
      * Constructs affinity cached calculations.
@@ -147,9 +147,14 @@ public class GridAffinityAssignmentCache {
         affCache = new ConcurrentSkipListMap<>();
         head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
 
-        similarAffKey = new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, partsCnt);
+        similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
+
+        assert similarAffKey != null;
     }
 
+    /**
+     * @return Key to find caches with similar affinity.
+     */
     public Object similarAffinityKey() {
         return similarAffKey;
     }
@@ -612,57 +617,4 @@ public class GridAffinityAssignmentCache {
             return S.toString(AffinityReadyFuture.class, this);
         }
     }
-
-    /**
-     *
-     */
-    private static class SimilarAffinityKey {
-        /** */
-        private final int backups;
-
-        /** */
-        private final Class<?> affFuncCls;
-
-        /** */
-        private final Class<?> filterCls;
-
-        /** */
-        private final int partsCnt;
-
-        /** */
-        private final int hash;
-
-        public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
-            this.backups = backups;
-            this.affFuncCls = affFuncCls;
-            this.filterCls = filterCls;
-            this.partsCnt = partsCnt;
-
-            int hash = backups;
-            hash = 31 * hash + affFuncCls.hashCode();
-            hash = 31 * hash + filterCls.hashCode();
-            hash= 31 * hash + partsCnt;
-
-            this.hash = hash;
-        }
-
-        @Override public int hashCode() {
-            return hash;
-        }
-
-        @Override public boolean equals(Object o) {
-            if (o == this)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            SimilarAffinityKey key = (SimilarAffinityKey)o;
-
-            return backups == key.backups &&
-                affFuncCls == key.affFuncCls &&
-                filterCls == key.filterCls &&
-                partsCnt == key.partsCnt;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index d3783f0..b9182ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -568,6 +569,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         return nodes.iterator().next();
     }
 
+    /**
+     * @param aff Affinity function.
+     * @param nodeFilter Node class.
+     * @param backups Number of backups.
+     * @param parts Number of partitions.
+     * @return Key to find caches with similar affinity.
+     */
+    public Object similaryAffinityKey(AffinityFunction aff,
+        IgnitePredicate<ClusterNode> nodeFilter,
+        int backups,
+        int parts) {
+        return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts);
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");
@@ -966,4 +981,70 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             return aff;
         }
     }
+
+    /**
+     *
+     */
+    private static class SimilarAffinityKey {
+        /** */
+        private final int backups;
+
+        /** */
+        private final Class<?> affFuncCls;
+
+        /** */
+        private final Class<?> filterCls;
+
+        /** */
+        private final int partsCnt;
+
+        /** */
+        private final int hash;
+
+        /**
+         * @param affFuncCls Affinity function class.
+         * @param filterCls Node filter class.
+         * @param backups Number of backups.
+         * @param partsCnt Number of partitions.
+         */
+        SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
+            this.backups = backups;
+            this.affFuncCls = affFuncCls;
+            this.filterCls = filterCls;
+            this.partsCnt = partsCnt;
+
+            int hash = backups;
+            hash = 31 * hash + affFuncCls.hashCode();
+            hash = 31 * hash + filterCls.hashCode();
+            hash= 31 * hash + partsCnt;
+
+            this.hash = hash;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return hash;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (o == this)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            SimilarAffinityKey key = (SimilarAffinityKey)o;
+
+            return backups == key.backups &&
+                affFuncCls == key.affFuncCls &&
+                filterCls == key.filterCls &&
+                partsCnt == key.partsCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SimilarAffinityKey.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 88f1f97..2890887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -127,7 +127,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param node Event node.
      * @param topVer Topology version.
      */
-    public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+    void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
         if (type == EVT_NODE_JOINED && node.isLocal()) {
             // Clean-up in case of client reconnect.
             registeredCaches.clear();
@@ -153,7 +153,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param msg Customer message.
      * @return {@code True} if minor topology version should be increased.
      */
-    public boolean onCustomEvent(CacheAffinityChangeMessage msg) {
+    boolean onCustomEvent(CacheAffinityChangeMessage msg) {
         assert lateAffAssign : msg;
 
         if (msg.exchangeId() != null) {
@@ -219,7 +219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param top Topology.
      * @param checkCacheId Cache ID.
      */
-    public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
+    void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
         if (!lateAffAssign)
             return;
 
@@ -1246,6 +1246,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is not owner.
+     * @param affCache Already calculated assignments (to reduce data stored in history).
      * @throws IgniteCheckedException If failed.
      */
     private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
@@ -1303,10 +1304,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param aff
-     * @param assign
-     * @param affCache
-     * @return
+     * @param aff Assignment cache.
+     * @param assign Assignment.
+     * @param affCache Assignments already calculated for other caches.
+     * @return Assignment.
      */
     private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff,
         List<List<ClusterNode>> assign,
@@ -1393,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return Affinity assignment.
      * @throws IgniteCheckedException If failed.
      */
-    public Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
+    private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
         final AffinityTopologyVersion topVer = fut.topologyVersion();
 
@@ -1580,7 +1581,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param aff Affinity cache.
          * @param initAff Existing affinity cache.
          */
-        public CacheHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) {
+        CacheHolder(boolean rebalanceEnabled,
+            GridAffinityAssignmentCache aff,
+            @Nullable GridAffinityAssignmentCache initAff) {
             this.aff = aff;
 
             if (initAff != null)
@@ -1632,7 +1635,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Created cache is started on coordinator.
      */
-    class CacheHolder1 extends CacheHolder {
+    private class CacheHolder1 extends CacheHolder {
         /** */
         private final GridCacheContext cctx;
 
@@ -1640,7 +1643,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param cctx Cache context.
          * @param initAff Current affinity.
          */
-        public CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
+        CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
             super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff);
 
             assert !cctx.isLocal() : cctx.name();
@@ -1677,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Created if cache is not started on coordinator.
      */
-    static class CacheHolder2 extends CacheHolder {
+    private static class CacheHolder2 extends CacheHolder {
         /** */
         private final GridCacheSharedContext cctx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 4dcff9b..39e1c50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -86,7 +86,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
      * @param clientNodes Client nodes map.
      */
     public void clientNodes(Map<String, Map<UUID, Boolean>> clientNodes) {
-        this.clientNodes = clientNodes;
+        this.clientNodes = null;//clientNodes;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a81bf0f..928500f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,6 +21,7 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
@@ -44,7 +45,9 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -56,6 +59,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -72,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -81,6 +86,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -532,8 +538,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (top != null)
             return top;
 
+        Object affKey = null;
+
+        DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId);
+
+        if (desc != null) {
+            CacheConfiguration ccfg = desc.cacheConfiguration();
+
+            AffinityFunction aff = ccfg.getAffinity();
+
+            affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
+                ccfg.getNodeFilter(),
+                ccfg.getBackups(),
+                aff.partitions());
+        }
+
         GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId,
-            top = new GridClientPartitionTopology(cctx, cacheId, exchFut));
+            top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey));
 
         return old != null ? old : top;
     }
@@ -762,43 +783,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param nodes Nodes.
      * @return {@code True} if message was sent, {@code false} if node left grid.
      */
-    private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
-
-        boolean useOldApi = false;
-        boolean compress = true;
-
-        for (ClusterNode node : nodes) {
-            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
-                useOldApi = true;
-                compress = false;
-
-                break;
-            }
-            else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
-                compress = false;
-        }
-
-        m.compress(compress);
-
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal() && cacheCtx.started()) {
-                GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
-                if (useOldApi) {
-                    locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
-                        locMap.nodeOrder(),
-                        locMap.updateSequence(),
-                        locMap);
-                }
-
-                m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
-            }
-        }
-
-        // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
-            m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
+    private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
 
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@ -821,32 +807,140 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node.
-     * @param id ID.
+     * @param nodes Target nodes.
+     * @param exchId Non-null exchange ID if message is created for exchange.
+     * @param lastVer Last version.
+     * @param compress {@code True} if it is possible to use compression for message.
+     * @return Message.
      */
-    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
-            cctx.kernalContext().clientNode(),
-            cctx.versions().last(),
-            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0);
+    public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+        @Nullable GridDhtPartitionExchangeId exchId,
+        @Nullable GridCacheVersion lastVer,
+        boolean compress) {
+        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
+                lastVer,
+                exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+
+        boolean useOldApi = false;
+
+        if (nodes != null) {
+            for (ClusterNode node : nodes) {
+                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
+                    useOldApi = true;
+                    compress = false;
+
+                    break;
+                }
+                else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
+                    compress = false;
+            }
+        }
+
+        m.compress(compress);
+
+        Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
-                GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+                boolean ready;
 
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
-                    locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+                if (exchId != null) {
+                    AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+                    ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+                }
+                else
+                    ready = cacheCtx.started();
 
-                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
+                if (ready) {
+                    GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+
+                    if (useOldApi) {
+                        locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
+                            locMap.nodeOrder(),
+                            locMap.updateSequence(),
+                            locMap);
+                    }
+
+                    addFullPartitionsMap(m,
+                        dupData,
+                        compress,
+                        cacheCtx.cacheId(),
+                        locMap,
+                        cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+                    if (exchId != null)
+                        m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+                }
             }
         }
 
-        for (GridClientPartitionTopology top : clientTops.values()) {
-            GridDhtPartitionMap2 locMap = top.localPartitionMap();
+        // It is important that client topologies be added after contexts.
+        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
+            GridDhtPartitionFullMap map = top.partitionMap(true);
+
+            addFullPartitionsMap(m,
+                dupData,
+                compress,
+                top.cacheId(),
+                map,
+                top.similarAffinityKey());
+
+            if (exchId != null)
+                m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+        }
+
+        return m;
+    }
+
+    /**
+     * @param m Message.
+     * @param dupData Duplicated data map.
+     * @param compress {@code True} if need check for duplicated partition state data.
+     * @param cacheId Cache ID.
+     * @param map Map to add.
+     * @param affKey Cache affinity key.
+     */
+    private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
+        Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
+        boolean compress,
+        Integer cacheId,
+        GridDhtPartitionFullMap map,
+        Object affKey) {
+        Integer dupDataCache = null;
 
-            m.addLocalPartitionMap(top.cacheId(), locMap);
+        if (compress && affKey != null && !m.containsCache(cacheId)) {
+            T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
+
+            if (state0 != null && state0.get2().partitionStateEquals(map)) {
+                GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(),
+                    map.nodeOrder(),
+                    map.updateSequence());
+
+                for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet())
+                    map0.put(e.getKey(), e.getValue().emptyCopy());
+
+                map = map0;
+
+                dupDataCache = state0.get1();
+            }
+            else
+                dupData.put(affKey, new T2<>(cacheId, map));
         }
 
+        m.addFullPartitionsMap(cacheId, map, dupDataCache);
+    }
+
+    /**
+     * @param node Node.
+     * @param id ID.
+     */
+    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
+        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
+            id,
+            cctx.kernalContext().clientNode(),
+            false);
+
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
 
@@ -864,6 +958,81 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param targetNode Target node.
+     * @param exchangeId ID.
+     * @param clientOnlyExchange Client exchange flag.
+     * @param sndCounters {@code True} if need send partition update counters.
+     * @return Message.
+     */
+    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
+        @Nullable GridDhtPartitionExchangeId exchangeId,
+        boolean clientOnlyExchange,
+        boolean sndCounters)
+    {
+        boolean compress =
+            targetNode.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0;
+
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
+            clientOnlyExchange,
+            cctx.versions().last(),
+            compress);
+
+        Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>();
+
+        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+            if (!cacheCtx.isLocal()) {
+                GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+
+                if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+                    locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+
+                addPartitionMap(m,
+                    dupData,
+                    compress,
+                    cacheCtx.cacheId(),
+                    locMap,
+                    cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+                if (sndCounters)
+                    m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+            }
+        }
+
+        return m;
+    }
+
+    /**
+     * @param m Message.
+     * @param dupData Duplicated data map.
+     * @param compress {@code True} if need check for duplicated partition state data.
+     * @param cacheId Cache ID.
+     * @param map Map to add.
+     * @param affKey Cache affinity key.
+     */
+    private void addPartitionMap(GridDhtPartitionsSingleMessage m,
+        Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData,
+        boolean compress,
+        Integer cacheId,
+        GridDhtPartitionMap2 map,
+        Object affKey) {
+        Integer dupDataCache = null;
+
+        if (compress) {
+            T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey);
+
+            if (state0 != null && state0.get2().equals(map.map())) {
+                dupDataCache = state0.get1();
+
+                map.map(U.<Integer, GridDhtPartitionState>newHashMap(0));
+            }
+            else
+                dupData.put(affKey, new T2<>(cacheId, map.map()));
+        }
+
+        m.addLocalPartitionMap(cacheId, map, dupDataCache);
+    }
+
+    /**
      * @param nodeId Cause node ID.
      * @param topVer Topology version.
      * @param evt Event type.
@@ -880,7 +1049,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
-    GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
+    private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt,
         @Nullable Collection<DynamicCacheChangeRequest> reqs,
         @Nullable CacheAffinityChangeMessage affChangeMsg) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 58933b7..5efb317 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -61,6 +61,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** Flag to control amount of output for full map. */
     private static final boolean FULL_MAP_DEBUG = false;
 
+    /** */
+    private static final Long ZERO = 0L;
+
     /** Cache shared context. */
     private GridCacheSharedContext cctx;
 
@@ -97,18 +100,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** Partition update counters. */
     private Map<Integer, Long> cntrMap = new HashMap<>();
 
+    /** */
+    private final Object similarAffKey;
+
     /**
      * @param cctx Context.
      * @param cacheId Cache ID.
      * @param exchFut Exchange ID.
+     * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
         GridCacheSharedContext cctx,
         int cacheId,
-        GridDhtPartitionsExchangeFuture exchFut
+        GridDhtPartitionsExchangeFuture exchFut,
+        Object similarAffKey
     ) {
         this.cctx = cctx;
         this.cacheId = cacheId;
+        this.similarAffKey = similarAffKey;
 
         topVer = exchFut.topologyVersion();
 
@@ -125,6 +134,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /**
+     * @return Key to find caches with similar affinity.
+     */
+    @Nullable public Object similarAffinityKey() {
+        return similarAffKey;
+    }
+
+    /**
      * @return Full map string representation.
      */
     @SuppressWarnings( {"ConstantConditions"})
@@ -873,11 +889,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> updateCounters() {
+    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
         try {
-            return new HashMap<>(cntrMap);
+            if (skipZeros) {
+                Map<Integer, Long> res = U.newHashMap(cntrMap.size());
+
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    if (!e.getValue().equals(ZERO))
+                        res.put(e.getKey(), e.getValue());
+                }
+
+                return res;
+            }
+            else
+                return new HashMap<>(cntrMap);
         }
         finally {
             lock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 6e9b907..4ae4e47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology {
         @Nullable Map<Integer, Long> cntrMap);
 
     /**
+     * @param skipZeros If {@code true} then filters out zero counters.
      * @return Partition update counters.
      */
-    public Map<Integer, Long> updateCounters();
+    public Map<Integer, Long> updateCounters(boolean skipZeros);
 
     /**
      * @param part Partition to own.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 871a084..f3751ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Flag to control amount of output for full map. */
     private static final boolean FULL_MAP_DEBUG = false;
 
+    /** */
+    private static final Long ZERO = 0L;
+
     /** Context. */
     private final GridCacheContext<?, ?> cctx;
 
@@ -1500,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> updateCounters() {
+    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
         try {
-            Map<Integer, Long> res = new HashMap<>(cntrMap);
+            Map<Integer, Long> res;
+
+            if (skipZeros) {
+                res = U.newHashMap(cntrMap.size());
+
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = e.getValue();
+
+                    if (ZERO.equals(cntr))
+                        continue;
+
+                    res.put(e.getKey(), cntr);
+                }
+            }
+            else
+                res = new HashMap<>(cntrMap);
 
             for (int i = 0; i < locParts.length; i++) {
                 GridDhtLocalPartition part = locParts[i];
@@ -1513,7 +1531,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     continue;
 
                 Long cntr0 = res.get(part.id());
-                Long cntr1 = part.updateCounter();
+                long cntr1 = part.updateCounter();
+
+                if (skipZeros && cntr1 == 0L)
+                    continue;
 
                 if (cntr0 == null || cntr1 > cntr0)
                     res.put(part.id(), cntr1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 498d492..8f5ad17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -158,6 +158,24 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
     }
 
     /**
+     * @param fullMap Map.
+     * @return {@code True} if this map and given map contain the same data.
+     */
+    public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) {
+        if (size() != fullMap.size())
+            return false;
+
+        for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) {
+            GridDhtPartitionMap2 m = fullMap.get(e.getKey());
+
+            if (m == null || !m.map().equals(e.getValue().map()))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param updateSeq New update sequence value.
      * @return Old update sequence value.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 15b5a2e..dc308ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -63,25 +63,15 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
     /**
      * @param nodeId Node ID.
      * @param updateSeq Update sequence number.
-     */
-    public GridDhtPartitionMap2(UUID nodeId, long updateSeq) {
-        assert nodeId != null;
-        assert updateSeq > 0;
-
-        this.nodeId = nodeId;
-        this.updateSeq = updateSeq;
-
-        map = new HashMap<>();
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param updateSeq Update sequence number.
+     * @param top Topology version.
      * @param m Map to copy.
      * @param onlyActive If {@code true}, then only active states will be included.
      */
-    public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top,
-        Map<Integer, GridDhtPartitionState> m, boolean onlyActive) {
+    public GridDhtPartitionMap2(UUID nodeId,
+        long updateSeq,
+        AffinityTopologyVersion top,
+        Map<Integer, GridDhtPartitionState> m,
+        boolean onlyActive) {
         assert nodeId != null;
         assert updateSeq > 0;
 
@@ -100,6 +90,36 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param updateSeq Update sequence number.
+     * @param top Topology version.
+     * @param map Map.
+     * @param moving Number of moving partitions.
+     */
+    private GridDhtPartitionMap2(UUID nodeId,
+        long updateSeq,
+        AffinityTopologyVersion top,
+        Map<Integer, GridDhtPartitionState> map,
+        int moving) {
+        this.nodeId = nodeId;
+        this.updateSeq = updateSeq;
+        this.top = top;
+        this.map = map;
+        this.moving = moving;
+    }
+
+    /**
+     * @return Copy with empty partition state map.
+     */
+    public GridDhtPartitionMap2 emptyCopy() {
+        return new GridDhtPartitionMap2(nodeId,
+            updateSeq,
+            top,
+            U.<Integer, GridDhtPartitionState>newHashMap(0),
+            moving);
+    }
+
+    /**
      * Empty constructor required for {@link Externalizable}.
      */
     public GridDhtPartitionMap2() {
@@ -174,6 +194,13 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
     }
 
     /**
+     * @param map Partition states map.
+     */
+    public void map(Map<Integer, GridDhtPartitionState> map) {
+        this.map = map;
+    }
+
+    /**
      * @return Node ID.
      */
     public UUID nodeId() {
@@ -277,9 +304,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
         long ver = in.readLong();
         int minorVer = in.readInt();
 
-        if (ver != 0) {
+        if (ver != 0)
             top = new AffinityTopologyVersion(ver, minorVer);
-        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6a17583..a79aba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -544,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                 if (updateTop && clientTop != null)
-                    cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters());
+                    cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
             }
 
             top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -668,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                             if (top.cacheId() == cacheCtx.cacheId()) {
                                 cacheCtx.topology().update(exchId,
                                     top.partitionMap(true),
-                                    top.updateCounters());
+                                    top.updateCounters(false));
 
                                 break;
                             }
@@ -678,7 +680,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
             else {
                 if (!centralizedAff)
-                    sendLocalPartitions(crd, exchId);
+                    sendLocalPartitions(crd);
 
                 initDone();
 
@@ -928,28 +930,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param node Node.
-     * @param id ID.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+    private void sendLocalPartitions(ClusterNode node)
         throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+        GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(node,
+            exchangeId(),
             clientOnlyExchange,
-            cctx.versions().last(),
-            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0);
-
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal()) {
-                GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
-
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
-                    locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
-
-                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
-
-                m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
-            }
-        }
+            true);
 
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
@@ -965,60 +953,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param nodes Target nodes.
-     * @return Message;
+     * @param compress {@code True} if it is possible to use compression for message.
+     * @return Message.
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) {
         GridCacheVersion last = lastVer.get();
 
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(),
+        return cctx.exchange().createPartitionsFullMessage(nodes,
+            exchangeId(),
             last != null ? last : cctx.versions().last(),
-            topologyVersion());
-
-        boolean useOldApi = false;
-        boolean compress = true;
-
-        if (nodes != null) {
-            for (ClusterNode node : nodes) {
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
-                    useOldApi = true;
-                    compress = false;
-
-                    break;
-                }
-                else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
-                    compress = false;
-            }
-        }
-
-        m.compress(compress);
-
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal()) {
-                AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
-
-                boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0;
-
-                if (ready) {
-                    GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
-                    if (useOldApi)
-                        locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap);
-
-                    m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
-
-                    m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
-                }
-            }
-        }
-
-        // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-            m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
-
-            m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
-        }
-
-        return m;
+            compress);
     }
 
     /**
@@ -1026,7 +970,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes);
+        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
 
         if (log.isDebugEnabled())
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
@@ -1040,7 +984,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      */
     private void sendPartitions(ClusterNode oldestNode) {
         try {
-            sendLocalPartitions(oldestNode, exchId);
+            sendLocalPartitions(oldestNode);
         }
         catch (ClusterTopologyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -1244,7 +1188,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(null);
+            GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 63d63e2..3d2d380 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -22,14 +22,19 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
@@ -48,6 +53,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     private Map<Integer, GridDhtPartitionFullMap> parts;
 
     /** */
+    @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+    private Map<Integer, Integer> dupPartsData;
+
+    /** */
     private byte[] partsBytes;
 
     /** Partitions update counters. */
@@ -63,7 +72,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** */
     @GridDirectTransient
-    private boolean compress;
+    private transient boolean compress;
 
     /**
      * Required by {@link Externalizable}.
@@ -87,6 +96,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         this.topVer = topVer;
     }
 
+    /**
+     * @param compress {@code True} if it is possible to use compression for message.
+     */
     public void compress(boolean compress) {
         this.compress = compress;
     }
@@ -100,14 +112,33 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /**
      * @param cacheId Cache ID.
+     * @return {@code True} if message contains full map for given cache.
+     */
+    public boolean containsCache(int cacheId) {
+        return parts != null && parts.containsKey(cacheId);
+    }
+
+    /**
+     * @param cacheId Cache ID.
      * @param fullMap Full partitions map.
+     * @param dupDataCache Optional ID of cache with the same partition state map.
      */
-    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) {
+    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
         if (parts == null)
             parts = new HashMap<>();
 
-        if (!parts.containsKey(cacheId))
+        if (!parts.containsKey(cacheId)) {
             parts.put(cacheId, fullMap);
+
+            if (dupDataCache != null) {
+                assert parts.containsKey(dupDataCache);
+
+                if (dupPartsData == null)
+                    dupPartsData = new HashMap<>();
+
+                dupPartsData.put(cacheId, dupDataCache);
+            }
+        }
     }
 
     /**
@@ -197,6 +228,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
             else
                 parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+            if (dupPartsData != null) {
+                assert parts != null;
+
+                for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+                    GridDhtPartitionFullMap map1 = parts.get(e.getKey());
+
+                    assert map1 != null : e.getKey();
+
+                    GridDhtPartitionFullMap map2 = parts.get(e.getValue());
+
+                    assert map2 != null : e.getValue();
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) {
+                        GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey());
+
+                        assert partMap1 != null && partMap1.map().isEmpty() : partMap1;
+                        assert !partMap1.hasMovingPartitions() : partMap1;
+
+                        GridDhtPartitionMap2 partMap2 = e0.getValue();
+
+                        assert partMap2 != null;
+
+                        for (Map.Entry<Integer, GridDhtPartitionState> stateEntry : partMap2.entrySet())
+                            partMap1.put(stateEntry.getKey(), stateEntry.getValue());
+                    }
+                }
+            }
         }
 
         if (parts == null)
@@ -229,18 +288,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (writer.state()) {
             case 6:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 8:
+                if (!writer.writeByteArray("partsBytes", partsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -263,7 +328,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (reader.state()) {
             case 6:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -271,7 +336,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 7:
-                partsBytes = reader.readByteArray("partsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -279,6 +344,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 8:
+                partsBytes = reader.readByteArray("partsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -298,7 +371,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index a37e092..416d298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -23,12 +23,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @GridDirectTransient
     private Map<Integer, GridDhtPartitionMap2> parts;
 
+    /** */
+    @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+    private Map<Integer, Integer> dupPartsData;
+
     /** Serialized partitions. */
     private byte[] partsBytes;
 
@@ -60,7 +68,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     private boolean client;
 
     /** */
-    private boolean compress;
+    @GridDirectTransient
+    private transient boolean compress;
 
     /**
      * Required by {@link Externalizable}.
@@ -73,6 +82,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param exchId Exchange ID.
      * @param client Client message flag.
      * @param lastVer Last version.
+     * @param compress {@code True} if it is possible to use compression for message.
      */
     public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
         boolean client,
@@ -92,16 +102,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
-     * Adds partition map to this message.
-     *
      * @param cacheId Cache ID to add local partition for.
      * @param locMap Local partition map.
+     * @param dupDataCache Optional ID of cache with the same partition state map.
      */
-    public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap) {
+    public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) {
         if (parts == null)
             parts = new HashMap<>();
 
         parts.put(cacheId, locMap);
+
+        if (dupDataCache != null) {
+            assert F.isEmpty(locMap.map());
+
+            if (dupPartsData == null)
+                dupPartsData = new HashMap<>();
+
+            dupPartsData.put(cacheId, dupDataCache);
+        }
     }
 
     /**
@@ -183,7 +201,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             if (compressed())
                 parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
             else
-                parts =U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+                parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
         if (partCntrsBytes != null && partCntrs == null) {
@@ -192,6 +210,26 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             else
                 partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
+
+        if (dupPartsData != null) {
+            assert parts != null;
+
+            for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+                GridDhtPartitionMap2 map1 = parts.get(e.getKey());
+
+                assert map1 != null : e.getKey();
+                assert F.isEmpty(map1.map());
+                assert !map1.hasMovingPartitions();
+
+                GridDhtPartitionMap2 map2 = parts.get(e.getValue());
+
+                assert map2 != null : e.getValue();
+                assert map2.map() != null;
+
+                for (Map.Entry<Integer, GridDhtPartitionState> e0 : map2.map().entrySet())
+                    map1.put(e0.getKey(), e0.getValue());
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -216,12 +254,18 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 8:
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -252,7 +296,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 7:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -260,6 +304,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 8:
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -279,7 +331,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3a559e7..9fd9b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -239,7 +239,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                 GridCacheContext cctx = interCache != null ? interCache.context() : null;
 
                                 if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
-                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
+                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false));
 
                                 routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
@@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
-                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
+                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f929121..f8e38d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            TcpDiscoveryAbstractMessage msg = null;
+            TcpDiscoveryAbstractMessage msg;
 
             while (!Thread.currentThread().isInterrupted()) {
                 Socket sock;
@@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    if (msg == null)
-                        msg = queue.poll();
+                    msg = queue.poll();
 
                     if (msg == null) {
                         mux.wait();
@@ -1121,19 +1120,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
                 }
-                catch (IOException e) {
-                    if (log.isDebugEnabled())
-                        U.error(log, "Failed to send node left message (will stop anyway) " +
-                            "[sock=" + sock + ", msg=" + msg + ']', e);
-
-                    U.closeQuiet(sock);
-
-                    synchronized (mux) {
-                        if (sock == this.sock)
-                            this.sock = null; // Connection has dead.
-                    }
-                }
-                catch (IgniteCheckedException e) {
+                catch (Exception e) {
                     if (spi.getSpiContext().isStopping()) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']');
@@ -1141,7 +1128,12 @@ class ClientImpl extends TcpDiscoveryImpl {
                     else
                         U.error(log, "Failed to send message: " + msg, e);
 
-                    msg = null;
+                    U.closeQuiet(sock);
+
+                    synchronized (mux) {
+                        if (sock == this.sock)
+                            this.sock = null; // Connection has dead.
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
new file mode 100644
index 0000000..d07fdd3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String AFF1_CACHE1 = "a1c1";
+
+    /** */
+    private static final String AFF1_CACHE2 = "a1c2";
+
+    /** */
+    private static final String AFF2_CACHE1 = "a2c1";
+
+    /** */
+    private static final String AFF2_CACHE2 = "a2c2";
+
+    /** */
+    private static final String AFF3_CACHE1 = "a3c1";
+
+    /** */
+    private static final String AFF4_FILTER_CACHE1 = "a4c1";
+
+    /** */
+    private static final String AFF4_FILTER_CACHE2 = "a4c2";
+
+    /** */
+    private static final String AFF5_FILTER_CACHE1 = "a5c1";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF1_CACHE1);
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF1_CACHE2);
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF2_CACHE1);
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF2_CACHE2);
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF3_CACHE1);
+            ccfg.setBackups(3);
+
+            RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64);
+            ccfg.setAffinity(aff);
+
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF4_FILTER_CACHE1);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF4_FILTER_CACHE2);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF5_FILTER_CACHE1);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExchangeMessages() throws Exception {
+        ignite(0);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, true);
+
+        startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, true);
+
+        client = true;
+
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, false);
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(1, true);
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     * @param checkSingle {@code True} if need check single messages.
+     */
+    private void checkMessages(int crdIdx, boolean checkSingle) {
+        checkFullMessages(crdIdx);
+
+        if (checkSingle)
+            checkSingleMessages(crdIdx);
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     */
+    private void checkFullMessages(int crdIdx) {
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi();
+
+        List<Object> msgs = commSpi0.recordedMessages(false);
+
+        assertTrue(msgs.size() > 0);
+
+        for (Object msg : msgs) {
+            assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage);
+
+            checkFullMessage((GridDhtPartitionsFullMessage)msg);
+        }
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     */
+    private void checkSingleMessages(int crdIdx) {
+        int cnt = 0;
+
+        for (Ignite ignite : Ignition.allGrids()) {
+            if (getTestGridName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode())
+                continue;
+
+            TestRecordingCommunicationSpi commSpi0 =
+                (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+            List<Object> msgs = commSpi0.recordedMessages(false);
+
+            assertTrue(msgs.size() > 0);
+
+            for (Object msg : msgs) {
+                assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage);
+
+                checkSingleMessage((GridDhtPartitionsSingleMessage)msg);
+            }
+
+            cnt++;
+        }
+
+        assertTrue(cnt > 0);
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void checkFullMessage(GridDhtPartitionsFullMessage msg) {
+        Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+        assertNotNull(dupPartsData);
+
+        checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+        checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+        checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+        Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+        if (partCntrs != null) {
+            for (Map<Integer, Long> cntrs : partCntrs.values())
+                assertTrue(cntrs.isEmpty());
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) {
+        Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+        assertNotNull(dupPartsData);
+
+        checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+        checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+        checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+        Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+        if (partCntrs != null) {
+            for (Map<Integer, Long> cntrs : partCntrs.values())
+                assertTrue(cntrs.isEmpty());
+        }
+    }
+
+    /**
+     * @param cache1 Cache 1.
+     * @param cache2 Cache 2.
+     * @param dupPartsData Duplicated data map.
+     * @param msg Message.
+     */
+    private void checkFullMessage(String cache1,
+        String cache2,
+        Map<Integer, Integer> dupPartsData,
+        GridDhtPartitionsFullMessage msg)
+    {
+        Integer cacheId;
+        Integer dupCacheId;
+
+        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+            cacheId = CU.cacheId(cache1);
+            dupCacheId = CU.cacheId(cache2);
+        }
+        else {
+            cacheId = CU.cacheId(cache2);
+            dupCacheId = CU.cacheId(cache1);
+        }
+
+        assertTrue(dupPartsData.containsKey(cacheId));
+        assertEquals(dupCacheId, dupPartsData.get(cacheId));
+        assertFalse(dupPartsData.containsKey(dupCacheId));
+
+        Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions();
+
+        GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId);
+
+        for (GridDhtPartitionMap2 map : emptyFullMap.values())
+            assertEquals(0, map.map().size());
+
+        GridDhtPartitionFullMap fullMap = parts.get(dupCacheId);
+
+        for (GridDhtPartitionMap2 map : fullMap.values())
+            assertFalse(map.map().isEmpty());
+    }
+
+    /**
+     * @param cache1 Cache 1.
+     * @param cache2 Cache 2.
+     * @param dupPartsData Duplicated data map.
+     * @param msg Message.
+     */
+    private void checkSingleMessage(String cache1,
+        String cache2,
+        Map<Integer, Integer> dupPartsData,
+        GridDhtPartitionsSingleMessage msg)
+    {
+        Integer cacheId;
+        Integer dupCacheId;
+
+        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+            cacheId = CU.cacheId(cache1);
+            dupCacheId = CU.cacheId(cache2);
+        }
+        else {
+            cacheId = CU.cacheId(cache2);
+            dupCacheId = CU.cacheId(cache1);
+        }
+
+        assertTrue(dupPartsData.containsKey(cacheId));
+        assertEquals(dupCacheId, dupPartsData.get(cacheId));
+        assertFalse(dupPartsData.containsKey(dupCacheId));
+
+        Map<Integer, GridDhtPartitionMap2> parts = msg.partitions();
+
+        GridDhtPartitionMap2 emptyMap = parts.get(cacheId);
+
+        assertEquals(0, emptyMap.map().size());
+
+        GridDhtPartitionMap2 map = parts.get(dupCacheId);
+
+        assertFalse(map.map().isEmpty());
+    }
+
+    /**
+     *
+     */
+    private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            // Do not start cache on coordinator.
+            return node.order() > 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
index 87d02a5..cde6b8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
@@ -41,9 +41,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe
     /** */
     private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    /** */
-    private static final boolean DISCO_DEBUG_MODE = false;
-
     /**
      * Constructs test.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
index 9b0637e..f3942d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
@@ -34,7 +34,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest {
     /** Entry count. */
     public static final int CNT = 100_000;
-    public static final String STATIC_CACHE_NAME = "static";
+
+    /** */
+    private static final String STATIC_CACHE_NAME = "static";
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {