You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/22 11:40:11 UTC

[22/50] [abbrv] ignite git commit: ignite-4154 Optimize amount of data stored in discovery history Discovery history optimizations: - remove discarded message for discovery pending messages - remove duplicated data from TcpDiscoveryNodeAddedMessage.oldNo

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a4ff04b..90d6242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -22,14 +22,19 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
@@ -48,6 +53,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     private Map<Integer, GridDhtPartitionFullMap> parts;
 
     /** */
+    @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+    private Map<Integer, Integer> dupPartsData;
+
+    /** */
     private byte[] partsBytes;
 
     /** Partitions update counters. */
@@ -61,6 +70,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    @GridDirectTransient
+    private transient boolean compress;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -84,6 +97,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /**
+     * @param compress {@code True} if it is possible to use compression for message.
+     */
+    public void compress(boolean compress) {
+        this.compress = compress;
+    }
+
+    /**
      * @return Local partitions.
      */
     public Map<Integer, GridDhtPartitionFullMap> partitions() {
@@ -92,14 +112,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /**
      * @param cacheId Cache ID.
+     * @return {@code True} if message contains full map for given cache.
+     */
+    public boolean containsCache(int cacheId) {
+        return parts != null && parts.containsKey(cacheId);
+    }
+
+    /**
+     * @param cacheId Cache ID.
      * @param fullMap Full partitions map.
+     * @param dupDataCache Optional ID of cache with the same partition state map.
      */
-    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) {
+    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
         if (parts == null)
             parts = new HashMap<>();
 
-        if (!parts.containsKey(cacheId))
+        if (!parts.containsKey(cacheId)) {
             parts.put(cacheId, fullMap);
+
+            if (dupDataCache != null) {
+                assert compress;
+                assert parts.containsKey(dupDataCache);
+
+                if (dupPartsData == null)
+                    dupPartsData = new HashMap<>();
+
+                dupPartsData.put(cacheId, dupDataCache);
+            }
+        }
     }
 
     /**
@@ -132,11 +172,38 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (parts != null && partsBytes == null)
-            partsBytes = U.marshal(ctx, parts);
+        boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null);
+
+        if (marshal) {
+            byte[] partsBytes0 = null;
+            byte[] partCntrsBytes0 = null;
+
+            if (parts != null && partsBytes == null)
+                partsBytes0 = U.marshal(ctx, parts);
 
-        if (partCntrs != null && partCntrsBytes == null)
-            partCntrsBytes = U.marshal(ctx, partCntrs);
+            if (partCntrs != null && partCntrsBytes == null)
+                partCntrsBytes0 = U.marshal(ctx, partCntrs);
+
+            if (compress) {
+                assert !compressed();
+
+                try {
+                    byte[] partsBytesZip = U.zip(partsBytes0);
+                    byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+
+                    partsBytes0 = partsBytesZip;
+                    partCntrsBytes0 = partCntrsBytesZip;
+
+                    compressed(true);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
+                }
+            }
+
+            partsBytes = partsBytes0;
+            partCntrsBytes = partCntrsBytes0;
+        }
     }
 
     /**
@@ -157,14 +224,49 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null && parts == null)
-            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partsBytes != null && parts == null) {
+            if (compressed())
+                parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+            if (dupPartsData != null) {
+                assert parts != null;
+
+                for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+                    GridDhtPartitionFullMap map1 = parts.get(e.getKey());
+                    GridDhtPartitionFullMap map2 = parts.get(e.getValue());
+
+                    assert map1 != null : e.getKey();
+                    assert map2 != null : e.getValue();
+                    assert map1.size() == map2.size();
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) {
+                        GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey());
+
+                        assert partMap1 != null && partMap1.map().isEmpty() : partMap1;
+                        assert !partMap1.hasMovingPartitions() : partMap1;
+
+                        GridDhtPartitionMap2 partMap2 = e0.getValue();
+
+                        assert partMap2 != null;
+
+                        for (Map.Entry<Integer, GridDhtPartitionState> stateEntry : partMap2.entrySet())
+                            partMap1.put(stateEntry.getKey(), stateEntry.getValue());
+                    }
+                }
+            }
+        }
 
         if (parts == null)
             parts = new HashMap<>();
 
-        if (partCntrsBytes != null && partCntrs == null)
-            partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partCntrsBytes != null && partCntrs == null) {
+            if (compressed())
+                partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
 
         if (partCntrs == null)
             partCntrs = new HashMap<>();
@@ -185,19 +287,25 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
-            case 5:
+            case 6:
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 8:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 9:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -219,7 +327,15 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
-            case 5:
+            case 6:
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -227,7 +343,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 6:
+            case 8:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -235,7 +351,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 7:
+            case 9:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -255,7 +371,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index e4356b1..bf08f0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -23,12 +23,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @GridDirectTransient
     private Map<Integer, GridDhtPartitionMap2> parts;
 
+    /** */
+    @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+    private Map<Integer, Integer> dupPartsData;
+
     /** Serialized partitions. */
     private byte[] partsBytes;
 
@@ -59,6 +67,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** */
     private boolean client;
 
+    /** */
+    @GridDirectTransient
+    private transient boolean compress;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -70,13 +82,16 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param exchId Exchange ID.
      * @param client Client message flag.
      * @param lastVer Last version.
+     * @param compress {@code True} if it is possible to use compression for message.
      */
     public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
         boolean client,
-        @Nullable GridCacheVersion lastVer) {
+        @Nullable GridCacheVersion lastVer,
+        boolean compress) {
         super(exchId, lastVer);
 
         this.client = client;
+        this.compress = compress;
     }
 
     /**
@@ -87,16 +102,26 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
-     * Adds partition map to this message.
-     *
      * @param cacheId Cache ID to add local partition for.
      * @param locMap Local partition map.
+     * @param dupDataCache Optional ID of cache with the same partition state map.
      */
-    public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap) {
+    public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) {
         if (parts == null)
             parts = new HashMap<>();
 
         parts.put(cacheId, locMap);
+
+        if (dupDataCache != null) {
+            assert compress;
+            assert F.isEmpty(locMap.map());
+            assert parts.containsKey(dupDataCache);
+
+            if (dupPartsData == null)
+                dupPartsData = new HashMap<>();
+
+            dupPartsData.put(cacheId, dupDataCache);
+        }
     }
 
     /**
@@ -136,22 +161,77 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (partsBytes == null && parts != null)
-            partsBytes = U.marshal(ctx, parts);
+        boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null);
+
+        if (marshal) {
+            byte[] partsBytes0 = null;
+            byte[] partCntrsBytes0 = null;
+
+            if (parts != null && partsBytes == null)
+                partsBytes0 = U.marshal(ctx, parts);
 
-        if (partCntrsBytes == null && partCntrs != null)
-            partCntrsBytes = U.marshal(ctx, partCntrs);
+            if (partCntrs != null && partCntrsBytes == null)
+                partCntrsBytes0 = U.marshal(ctx, partCntrs);
+
+            if (compress) {
+                assert !compressed();
+
+                try {
+                    byte[] partsBytesZip = U.zip(partsBytes0);
+                    byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+
+                    partsBytes0 = partsBytesZip;
+                    partCntrsBytes0 = partCntrsBytesZip;
+
+                    compressed(true);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
+                }
+            }
+
+            partsBytes = partsBytes0;
+            partCntrsBytes = partCntrsBytes0;
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null && parts == null)
-            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partsBytes != null && parts == null) {
+            if (compressed())
+                parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
+
+        if (partCntrsBytes != null && partCntrs == null) {
+            if (compressed())
+                partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
+
+        if (dupPartsData != null) {
+            assert parts != null;
+
+            for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+                GridDhtPartitionMap2 map1 = parts.get(e.getKey());
 
-        if (partCntrsBytes != null && partCntrs == null)
-            partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+                assert map1 != null : e.getKey();
+                assert F.isEmpty(map1.map());
+                assert !map1.hasMovingPartitions();
+
+                GridDhtPartitionMap2 map2 = parts.get(e.getValue());
+
+                assert map2 != null : e.getValue();
+                assert map2.map() != null;
+
+                for (Map.Entry<Integer, GridDhtPartitionState> e0 : map2.map().entrySet())
+                    map1.put(e0.getKey(), e0.getValue());
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -169,19 +249,25 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         }
 
         switch (writer.state()) {
-            case 5:
+            case 6:
                 if (!writer.writeBoolean("client", client))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 7:
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 9:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -203,7 +289,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             return false;
 
         switch (reader.state()) {
-            case 5:
+            case 6:
                 client = reader.readBoolean("client");
 
                 if (!reader.isLastRead())
@@ -211,7 +297,15 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 6:
+            case 7:
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -219,7 +313,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 7:
+            case 9:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -239,7 +333,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index a4106af..850b6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -81,11 +81,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtPartitionsSingleRequest.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 09aec81..d6865c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -600,7 +600,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
                         ", node=" + node + ']');
 
-                GridAffinityAssignment assignment = cctx.affinity().assignment(topVer);
+                AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
                 boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3a559e7..9fd9b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -239,7 +239,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                 GridCacheContext cctx = interCache != null ? interCache.context() : null;
 
                                 if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
-                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
+                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false));
 
                                 routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
@@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
-                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
+                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 7b011dd..e0f4a2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.util;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.Externalizable;
@@ -128,6 +130,8 @@ import java.util.logging.Logger;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
 import javax.management.DynamicMBean;
 import javax.management.JMException;
 import javax.management.MBeanServer;
@@ -9693,6 +9697,32 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * @param marsh Marshaller.
+     * @param zipBytes Zip-compressed bytes.
+     * @param clsLdr Class loader to use.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException
+     */
+    public static <T> T unmarshalZip(Marshaller marsh, byte[] zipBytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
+        assert marsh != null;
+        assert zipBytes != null;
+
+        try {
+            ZipInputStream in = new ZipInputStream(new ByteArrayInputStream(zipBytes));
+
+            in.getNextEntry();
+
+            return marsh.unmarshal(in, clsLdr);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
      * Unmarshals object from the input stream using given class loader.
      * This method should not close given input stream.
      * <p/>
@@ -9907,4 +9937,38 @@ public abstract class IgniteUtils {
         if (oldName != curName)
             LOC_IGNITE_NAME.set(oldName);
     }
+
+    /**
+     * @param bytes Byte array to compress.
+     * @return Compressed bytes.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static byte[] zip(@Nullable byte[] bytes) throws IgniteCheckedException {
+        try {
+            if (bytes == null)
+                return null;
+
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+            try (ZipOutputStream zos = new ZipOutputStream(bos)) {
+                ZipEntry entry = new ZipEntry("");
+
+                try {
+                    entry.setSize(bytes.length);
+
+                    zos.putNextEntry(entry);
+
+                    zos.write(bytes);
+                }
+                finally {
+                    zos.closeEntry();
+                }
+            }
+
+            return bos.toByteArray();
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f929121..733d204 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            TcpDiscoveryAbstractMessage msg = null;
+            TcpDiscoveryAbstractMessage msg;
 
             while (!Thread.currentThread().isInterrupted()) {
                 Socket sock;
@@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    if (msg == null)
-                        msg = queue.poll();
+                    msg = queue.poll();
 
                     if (msg == null) {
                         mux.wait();
@@ -1121,19 +1120,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
                 }
-                catch (IOException e) {
+                catch (InterruptedException e) {
                     if (log.isDebugEnabled())
-                        U.error(log, "Failed to send node left message (will stop anyway) " +
-                            "[sock=" + sock + ", msg=" + msg + ']', e);
-
-                    U.closeQuiet(sock);
+                        log.debug("Client socket writer interrupted.");
 
-                    synchronized (mux) {
-                        if (sock == this.sock)
-                            this.sock = null; // Connection has dead.
-                    }
+                    return;
                 }
-                catch (IgniteCheckedException e) {
+                catch (Exception e) {
                     if (spi.getSpiContext().isStopping()) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']');
@@ -1141,7 +1134,12 @@ class ClientImpl extends TcpDiscoveryImpl {
                     else
                         U.error(log, "Failed to send message: " + msg, e);
 
-                    msg = null;
+                    U.closeQuiet(sock);
+
+                    synchronized (mux) {
+                        if (sock == this.sock)
+                            this.sock = null; // Connection has dead.
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0de787d..8814745 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -133,7 +133,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -167,7 +167,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
  */
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
-    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024);
+    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE, 512);
 
     /** */
     private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
@@ -1479,7 +1479,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     private void prepareNodeAddedMessage(
         TcpDiscoveryAbstractMessage msg,
         UUID destNodeId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable Collection<PendingMessage> msgs,
         @Nullable IgniteUuid discardMsgId,
         @Nullable IgniteUuid discardCustomMsgId
         ) {
@@ -1506,7 +1506,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+
+                Collection<TcpDiscoveryAbstractMessage> msgs0 = null;
+
+                if (msgs != null) {
+                    msgs0 = new ArrayList<>(msgs.size());
+
+                    for (PendingMessage pendingMsg : msgs) {
+                        if (pendingMsg.msg != null)
+                            msgs0.add(pendingMsg.msg);
+                    }
+                }
+
+                nodeAddedMsg.messages(msgs0, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1892,7 +1904,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             assert spi.ensured(msg) && msg.verified() : msg;
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+                TcpDiscoveryNodeAddedMessage addedMsg =
+                    new TcpDiscoveryNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+
+                msg = addedMsg;
 
                 TcpDiscoveryNode node = addedMsg.node();
 
@@ -1910,12 +1925,109 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     addedMsg.clientTopology(top);
                 }
+
+                // Do not need this data for client reconnect.
+                addedMsg.oldNodesDiscoveryData(null);
+            }
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
+
+                if (addFinishMsg.clientDiscoData() != null) {
+                    addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+                    msg = addFinishMsg;
+
+                    Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
+
+                    Set<UUID> replaced = null;
+
+                    for (TcpDiscoveryAbstractMessage msg0 : msgs) {
+                        if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                            Map<UUID, Map<Integer, byte[]>> existingDiscoData =
+                                ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData();
+
+                            // Check if already stored message contains the same data to do not store copies multiple times.
+                            if (existingDiscoData != null) {
+                                for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) {
+                                    UUID nodeId = e.getKey();
+
+                                    if (F.contains(replaced, nodeId))
+                                        continue;
+
+                                    Map<Integer, byte[]> existingData = existingDiscoData.get(e.getKey());
+
+                                    if (existingData != null && mapsEqual(e.getValue(), existingData)) {
+                                        e.setValue(existingData);
+
+                                        if (replaced == null)
+                                            replaced = new HashSet<>();
+
+                                        boolean add = replaced.add(nodeId);
+
+                                        assert add;
+
+                                        if (replaced.size() == discoData.size())
+                                            break;
+                                    }
+                                }
+
+                                if (replaced != null && replaced.size() == discoData.size())
+                                    break;
+                            }
+                        }
+                    }
+                }
             }
+            else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+                clearClientAddFinished(msg.creatorNodeId());
+            else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+                clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId());
 
             msgs.add(msg);
         }
 
         /**
+         * @param clientId Client node ID.
+         */
+        private void clearClientAddFinished(UUID clientId) {
+            for (TcpDiscoveryAbstractMessage msg : msgs) {
+                if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                    TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
+
+                    if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
+                        addFinishMsg.clientDiscoData(null);
+                        addFinishMsg.clientNodeAttributes(null);
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param m1 Map 1.
+         * @param m2 Map 2.
+         * @return {@code True} if maps contain the same data.
+         */
+        private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
+            if (m1 == m2)
+                return true;
+
+            if (m1.size() == m2.size()) {
+                for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
+                    byte[] data = m2.get(e.getKey());
+
+                    if (!Arrays.equals(e.getValue(), data))
+                        return false;
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
          * Gets messages starting from provided ID (exclusive). If such
          * message is not found, {@code null} is returned (this indicates
          * a failure condition when it was already removed from queue).
@@ -2009,6 +2121,37 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     *
+     */
+    private static class PendingMessage {
+        /** */
+        TcpDiscoveryAbstractMessage msg;
+
+        /** */
+        final boolean customMsg;
+
+        /** */
+        final IgniteUuid id;
+
+        /**
+         * @param msg Message.
+         */
+        PendingMessage(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null && msg.id() != null : msg;
+
+            this.msg = msg;
+
+            id = msg.id();
+            customMsg = msg instanceof TcpDiscoveryCustomEventMessage;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PendingMessage.class, this);
+        }
+    }
+
+    /**
      * Pending messages container.
      */
     private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
@@ -2016,7 +2159,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private static final int MAX = 1024;
 
         /** Pending messages. */
-        private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+        private final Queue<PendingMessage> msgs = new ArrayDeque<>(MAX * 2);
 
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2024,7 +2167,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Discarded message ID. */
         private IgniteUuid discardId;
 
-        /** Discarded message ID. */
+        /** Discarded custom message ID. */
         private IgniteUuid customDiscardId;
 
         /**
@@ -2034,14 +2177,14 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            msgs.add(msg);
+            msgs.add(new PendingMessage(msg));
 
             while (msgs.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs.poll();
+                PendingMessage polled = msgs.poll();
 
                 assert polled != null;
 
-                if (polled.id().equals(discardId))
+                if (polled.id.equals(discardId))
                     break;
             }
         }
@@ -2051,6 +2194,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          *
          * @param msgs Message.
          * @param discardId Discarded message ID.
+         * @param customDiscardId Discarded custom event message ID.
          */
         void reset(
             @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
@@ -2059,8 +2203,10 @@ class ServerImpl extends TcpDiscoveryImpl {
         ) {
             this.msgs.clear();
 
-            if (msgs != null)
-                this.msgs.addAll(msgs);
+            if (msgs != null) {
+                for (TcpDiscoveryAbstractMessage msg : msgs)
+                    this.msgs.add(new PendingMessage(msg));
+            }
 
             this.discardId = discardId;
             this.customDiscardId = customDiscardId;
@@ -2070,12 +2216,52 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Discards message with provided ID and all before it.
          *
          * @param id Discarded message ID.
+         * @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
          */
         void discard(IgniteUuid id, boolean custom) {
             if (custom)
                 customDiscardId = id;
             else
                 discardId = id;
+
+            cleanup();
+        }
+
+        /**
+         *
+         */
+        void cleanup() {
+            Iterator<PendingMessage> msgIt = msgs.iterator();
+
+            boolean skipMsg = discardId != null;
+            boolean skipCustomMsg = customDiscardId != null;
+
+            while (msgIt.hasNext()) {
+                PendingMessage msg = msgIt.next();
+
+                if (msg.customMsg) {
+                    if (skipCustomMsg) {
+                        assert customDiscardId != null;
+
+                        if (F.eq(customDiscardId, msg.id)) {
+                            msg.msg = null;
+
+                            return;
+                        }
+                    }
+                }
+                else {
+                    if (skipMsg) {
+                        assert discardId != null;
+
+                        if (F.eq(discardId, msg.id)) {
+                            msg.msg = null;
+
+                            return;
+                        }
+                    }
+                }
+            }
         }
 
         /**
@@ -2098,7 +2284,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             private boolean skipCustomMsg = customDiscardId != null;
 
             /** Internal iterator. */
-            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            private Iterator<PendingMessage> msgIt = msgs.iterator();
 
             /** Next message. */
             private TcpDiscoveryAbstractMessage next;
@@ -2136,13 +2322,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                 next = null;
 
                 while (msgIt.hasNext()) {
-                    TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+                    PendingMessage msg0 = msgIt.next();
 
-                    if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+                    if (msg0.customMsg) {
                         if (skipCustomMsg) {
                             assert customDiscardId != null;
 
-                            if (F.eq(customDiscardId, msg0.id()))
+                            if (F.eq(customDiscardId, msg0.id))
                                 skipCustomMsg = false;
 
                             continue;
@@ -2152,14 +2338,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (skipMsg) {
                             assert discardId != null;
 
-                            if (F.eq(discardId, msg0.id()))
+                            if (F.eq(discardId, msg0.id))
                                 skipMsg = false;
 
                             continue;
                         }
                     }
 
-                    next = msg0;
+                    if (msg0.msg == null)
+                        continue;
+
+                    next = msg0.msg;
 
                     break;
                 }
@@ -2985,9 +3174,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (pendingMsgs.msgs.isEmpty())
                 return false;
 
-            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
-                if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
-                    TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
+            for (PendingMessage pendingMsg : pendingMsgs.msgs) {
+                if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) {
+                    TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
 
                     if (addMsg.node().id().equals(nodeId) && addMsg.id().compareTo(pendingMsgs.discardId) > 0)
                         return true;
@@ -3901,8 +4090,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
                     if (data != null)
-                        spi.onExchange(node.id(), node.id(), data,
-                            U.resolveClassLoader(spi.ignite().configuration()));
+                        spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()));
 
                     msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     }
 
     /**
+     * @param msg Message.
+     */
+    public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+        super(msg);
+
+        nodeId = msg.nodeId;
+        clientDiscoData = msg.clientDiscoData;
+        clientNodeAttrs = msg.clientNodeAttrs;
+    }
+
+    /**
      * Gets ID of the node added.
      *
      * @return ID of the node added.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 6f8e14e..bd52c04 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -234,14 +236,41 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param oldNodesDiscoData Discovery data from old nodes.
+     */
+    public void oldNodesDiscoveryData(Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData) {
+        this.oldNodesDiscoData = oldNodesDiscoData;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param discoData Discovery data to add.
      */
     public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) {
         // Old nodes disco data may be null if message
         // makes more than 1 pass due to stopping of the nodes in topology.
-        if (oldNodesDiscoData != null)
-            oldNodesDiscoData.put(nodeId, discoData);
+        if (oldNodesDiscoData != null) {
+            for (Map.Entry<UUID, Map<Integer, byte[]>> existingDataEntry : oldNodesDiscoData.entrySet()) {
+                Map<Integer, byte[]> existingData = existingDataEntry.getValue();
+
+                Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
+
+                while (it.hasNext()) {
+                    Map.Entry<Integer, byte[]> discoDataEntry = it.next();
+
+                    byte[] curData = existingData.get(discoDataEntry.getKey());
+
+                    if (Arrays.equals(curData, discoDataEntry.getValue()))
+                        it.remove();
+                }
+
+                if (discoData.isEmpty())
+                    break;
+            }
+
+            if (!discoData.isEmpty())
+                oldNodesDiscoData.put(nodeId, discoData);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
new file mode 100644
index 0000000..ed186ac
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String AFF1_CACHE1 = "a1c1";
+
+    /** */
+    private static final String AFF1_CACHE2 = "a1c2";
+
+    /** */
+    private static final String AFF2_CACHE1 = "a2c1";
+
+    /** */
+    private static final String AFF2_CACHE2 = "a2c2";
+
+    /** */
+    private static final String AFF3_CACHE1 = "a3c1";
+
+    /** */
+    private static final String AFF4_FILTER_CACHE1 = "a4c1";
+
+    /** */
+    private static final String AFF4_FILTER_CACHE2 = "a4c2";
+
+    /** */
+    private static final String AFF5_FILTER_CACHE1 = "a5c1";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF1_CACHE1);
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF1_CACHE2);
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF2_CACHE1);
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF2_CACHE2);
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF3_CACHE1);
+            ccfg.setBackups(3);
+
+            RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64);
+            ccfg.setAffinity(aff);
+
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF4_FILTER_CACHE1);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF4_FILTER_CACHE2);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF5_FILTER_CACHE1);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExchangeMessages() throws Exception {
+        ignite(0);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, true);
+
+        startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, true);
+
+        client = true;
+
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, false);
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(1, true);
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     * @param checkSingle {@code True} if need check single messages.
+     */
+    private void checkMessages(int crdIdx, boolean checkSingle) {
+        checkFullMessages(crdIdx);
+
+        if (checkSingle)
+            checkSingleMessages(crdIdx);
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     */
+    private void checkFullMessages(int crdIdx) {
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi();
+
+        List<Object> msgs = commSpi0.recordedMessages(false);
+
+        assertTrue(msgs.size() > 0);
+
+        for (Object msg : msgs) {
+            assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage);
+
+            checkFullMessage((GridDhtPartitionsFullMessage)msg);
+        }
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     */
+    private void checkSingleMessages(int crdIdx) {
+        int cnt = 0;
+
+        for (Ignite ignite : Ignition.allGrids()) {
+            if (getTestGridName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode())
+                continue;
+
+            TestRecordingCommunicationSpi commSpi0 =
+                (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+            List<Object> msgs = commSpi0.recordedMessages(false);
+
+            assertTrue(msgs.size() > 0);
+
+            for (Object msg : msgs) {
+                assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage);
+
+                checkSingleMessage((GridDhtPartitionsSingleMessage)msg);
+            }
+
+            cnt++;
+        }
+
+        assertTrue(cnt > 0);
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void checkFullMessage(GridDhtPartitionsFullMessage msg) {
+        Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+        assertNotNull(dupPartsData);
+
+        checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+        checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+        checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+        Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+        if (partCntrs != null) {
+            for (Map<Integer, Long> cntrs : partCntrs.values())
+                assertTrue(cntrs.isEmpty());
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) {
+        Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData");
+
+        assertNotNull(dupPartsData);
+
+        checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+        checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+        checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
+
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+        Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs");
+
+        if (partCntrs != null) {
+            for (Map<Integer, Long> cntrs : partCntrs.values())
+                assertTrue(cntrs.isEmpty());
+        }
+    }
+
+    /**
+     * @param cache1 Cache 1.
+     * @param cache2 Cache 2.
+     * @param dupPartsData Duplicated data map.
+     * @param msg Message.
+     */
+    private void checkFullMessage(String cache1,
+        String cache2,
+        Map<Integer, Integer> dupPartsData,
+        GridDhtPartitionsFullMessage msg)
+    {
+        Integer cacheId;
+        Integer dupCacheId;
+
+        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+            cacheId = CU.cacheId(cache1);
+            dupCacheId = CU.cacheId(cache2);
+        }
+        else {
+            cacheId = CU.cacheId(cache2);
+            dupCacheId = CU.cacheId(cache1);
+        }
+
+        assertTrue(dupPartsData.containsKey(cacheId));
+        assertEquals(dupCacheId, dupPartsData.get(cacheId));
+        assertFalse(dupPartsData.containsKey(dupCacheId));
+
+        Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions();
+
+        GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId);
+
+        for (GridDhtPartitionMap2 map : emptyFullMap.values())
+            assertEquals(0, map.map().size());
+
+        GridDhtPartitionFullMap fullMap = parts.get(dupCacheId);
+
+        for (GridDhtPartitionMap2 map : fullMap.values())
+            assertFalse(map.map().isEmpty());
+    }
+
+    /**
+     * @param cache1 Cache 1.
+     * @param cache2 Cache 2.
+     * @param dupPartsData Duplicated data map.
+     * @param msg Message.
+     */
+    private void checkSingleMessage(String cache1,
+        String cache2,
+        Map<Integer, Integer> dupPartsData,
+        GridDhtPartitionsSingleMessage msg)
+    {
+        Integer cacheId;
+        Integer dupCacheId;
+
+        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+            cacheId = CU.cacheId(cache1);
+            dupCacheId = CU.cacheId(cache2);
+        }
+        else {
+            cacheId = CU.cacheId(cache2);
+            dupCacheId = CU.cacheId(cache1);
+        }
+
+        assertTrue(dupPartsData.containsKey(cacheId));
+        assertEquals(dupCacheId, dupPartsData.get(cacheId));
+        assertFalse(dupPartsData.containsKey(dupCacheId));
+
+        Map<Integer, GridDhtPartitionMap2> parts = msg.partitions();
+
+        GridDhtPartitionMap2 emptyMap = parts.get(cacheId);
+
+        assertEquals(0, emptyMap.map().size());
+
+        GridDhtPartitionMap2 map = parts.get(dupCacheId);
+
+        assertFalse(map.map().isEmpty());
+    }
+
+    /**
+     *
+     */
+    private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            // Do not start cache on coordinator.
+            return node.order() > 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
index 5dc059b..6c577c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
@@ -630,7 +630,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
         if (cacheMode() == LOCAL)
             return;
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkEmpty();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
index 71d1182..3b0c2fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -72,6 +73,8 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest {
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         Boolean clientMode = client.get();
 
         if (clientMode != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index 2c47a1c..7b57d5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -115,21 +115,21 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
         startGrid(2);
         startGrid(3);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         for (int i = 0; i < 2; i++) {
             stopGrid(3);
 
-            awaitPartitionMapExchange(true, true);
+            awaitPartitionMapExchange(true, true, null);
 
             startGrid(3);
 
-            awaitPartitionMapExchange(true, true);
+            awaitPartitionMapExchange(true, true, null);
         }
 
         startGrid(4);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         assert rs.isEmpty();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 5716d59..de38952 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -240,7 +240,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(0, new AffinityTopologyVersion(2, waitMinorVer));
         waitForRebalancing(1, new AffinityTopologyVersion(2, waitMinorVer));
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkPartitionMapExchangeFinished();
 
@@ -250,7 +250,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         waitForRebalancing(1, 3);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkPartitionMapExchangeFinished();
 
@@ -261,7 +261,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(1, new AffinityTopologyVersion(4, waitMinorVer));
         waitForRebalancing(2, new AffinityTopologyVersion(4, waitMinorVer));
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkPartitionMapExchangeFinished();
 
@@ -271,7 +271,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         waitForRebalancing(1, 5);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkPartitionMapExchangeFinished();
 
@@ -339,7 +339,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         concurrentStartFinished = true;
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkSupplyContextMapIsEmpty();
 
@@ -607,7 +607,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 5, 1);
         waitForRebalancing(4, 5, 1);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkSupplyContextMapIsEmpty();
 
@@ -631,7 +631,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 6);
         waitForRebalancing(4, 6);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkSupplyContextMapIsEmpty();
 
@@ -641,7 +641,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 7);
         waitForRebalancing(4, 7);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkSupplyContextMapIsEmpty();
 
@@ -650,7 +650,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 8);
         waitForRebalancing(4, 8);
 
-        awaitPartitionMapExchange(true, true);
+        awaitPartitionMapExchange(true, true, null);
 
         checkPartitionMapExchangeFinished();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
index 87d02a5..cde6b8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
@@ -41,9 +41,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe
     /** */
     private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    /** */
-    private static final boolean DISCO_DEBUG_MODE = false;
-
     /**
      * Constructs test.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
index 9b0637e..f3942d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
@@ -34,7 +34,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest {
     /** Entry count. */
     public static final int CNT = 100_000;
-    public static final String STATIC_CACHE_NAME = "static";
+
+    /** */
+    private static final String STATIC_CACHE_NAME = "static";
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1b7fe2b..d2cb710 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -537,7 +537,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             Affinity<Object> aff = grid(i).affinity(null);
 
-            Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters();
+            Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(false);
 
             for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
                 if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
index 5ecc27a..1259faf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -68,6 +68,13 @@ public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest {
         assertFalse(failed);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 1ce98a5..043208c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
@@ -114,6 +115,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     /** */
     private GridStringLogger strLog;
 
+    /** */
+    private CacheConfiguration[] ccfgs;
+
+    /** */
+    private boolean client;
+
     /**
      * @throws Exception If fails.
      */
@@ -152,7 +159,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(spi);
 
-        cfg.setCacheConfiguration();
+        if (ccfgs != null)
+            cfg.setCacheConfiguration(ccfgs);
+        else
+            cfg.setCacheConfiguration();
 
         cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
 
@@ -194,9 +204,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         }
         else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode"))
             cfg.setFailureDetectionTimeout(30_000);
-        else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode")) {
+        else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode"))
             cfg.setFailureDetectionTimeout(3_000);
-        }
         else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureSegmentedNode")) {
             cfg.setFailureDetectionTimeout(6_000);
 
@@ -205,6 +214,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         else if (gridName.contains("testNodeShutdownOnRingMessageWorkerFailureFailedNode"))
             cfg.setGridLogger(strLog = new GridStringLogger());
 
+        cfg.setClientMode(client);
+
         return cfg;
     }
 
@@ -1961,6 +1972,63 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDuplicatedDiscoveryDataRemoved() throws Exception {
+        try {
+            TestDiscoveryDataDuplicateSpi.checkNodeAdded = false;
+            TestDiscoveryDataDuplicateSpi.checkClientNodeAddFinished = false;
+            TestDiscoveryDataDuplicateSpi.fail = false;
+
+            ccfgs = new CacheConfiguration[5];
+
+            for (int i = 0; i < ccfgs.length; i++) {
+                CacheConfiguration ccfg = new CacheConfiguration();
+
+                ccfg.setName(i == 0 ? null : ("static-cache-" + i));
+
+                ccfgs[i] = ccfg;
+            }
+
+            TestDiscoveryDataDuplicateSpi spi = new TestDiscoveryDataDuplicateSpi();
+
+            nodeSpi.set(spi);
+
+            startGrid(0);
+
+            for (int i = 0; i < 5; i++) {
+                nodeSpi.set(new TestDiscoveryDataDuplicateSpi());
+
+                startGrid(i + 1);
+            }
+
+            client = true;
+
+            Ignite clientNode = startGrid(6);
+
+            assertTrue(clientNode.configuration().isClientMode());
+
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName("c1");
+
+            clientNode.createCache(ccfg);
+
+            client = false;
+
+            nodeSpi.set(new TestDiscoveryDataDuplicateSpi());
+
+            startGrid(7);
+
+            assertTrue(TestDiscoveryDataDuplicateSpi.checkNodeAdded);
+            assertTrue(TestDiscoveryDataDuplicateSpi.checkClientNodeAddFinished);
+            assertFalse(TestDiscoveryDataDuplicateSpi.fail);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * @param nodeName Node name.
      * @throws Exception If failed.
      */
@@ -2015,6 +2083,66 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     *
+     */
+    private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi {
+        /** */
+        static volatile boolean fail;
+
+        /** */
+        static volatile boolean checkNodeAdded;
+
+        /** */
+        static volatile boolean checkClientNodeAddFinished;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out,
+            TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddedMessage)msg).oldNodesDiscoveryData();
+
+                checkDiscoData(discoData, msg);
+            }
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddFinishedMessage)msg).clientDiscoData();
+
+                checkDiscoData(discoData, msg);
+            }
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /**
+         * @param discoData Discovery data.
+         * @param msg Message.
+         */
+        private void checkDiscoData(Map<UUID, Map<Integer, byte[]>> discoData, TcpDiscoveryAbstractMessage msg) {
+            if (discoData != null && discoData.size() > 1) {
+                int cnt = 0;
+
+                for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) {
+                    Map<Integer, byte[]> map = e.getValue();
+
+                    if (map.containsKey(GridComponent.DiscoveryDataExchangeType.CACHE_PROC.ordinal()))
+                        cnt++;
+                }
+
+                if (cnt > 1) {
+                    fail = true;
+
+                    log.error("Expect cache data only from one node, but actually: " + cnt);
+                }
+
+                if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                    checkNodeAdded = true;
+                else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                    checkClientNodeAddFinished = true;
+            }
+        }
+    }
+
 
     /**
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 87509a4..22fa36d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -428,21 +428,42 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      */
     @SuppressWarnings("BusyWait")
     protected void awaitPartitionMapExchange() throws InterruptedException {
-        awaitPartitionMapExchange(false, false);
+        awaitPartitionMapExchange(false, false, null);
     }
 
     /**
      * @param waitEvicts If {@code true} will wait for evictions finished.
      * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished.
+     * @param nodes Optional nodes.
      * @throws InterruptedException If interrupted.
      */
     @SuppressWarnings("BusyWait")
-    protected void awaitPartitionMapExchange(boolean waitEvicts, boolean waitNode2PartUpdate) throws InterruptedException {
+    protected void awaitPartitionMapExchange(boolean waitEvicts,
+        boolean waitNode2PartUpdate,
+        @Nullable Collection<ClusterNode> nodes)
+        throws InterruptedException {
         long timeout = 30_000;
 
+        long startTime = -1;
+
+        Set<String> names = new HashSet<>();
+
         for (Ignite g : G.allGrids()) {
+            if (nodes != null && !nodes.contains(g.cluster().localNode()))
+                continue;
+
             IgniteKernal g0 = (IgniteKernal)g;
 
+            names.add(g0.configuration().getGridName());
+
+            if (startTime != -1) {
+                if (startTime != g0.context().discovery().gridStartTime())
+                    fail("Found nodes from different clusters, probable some test does not stop nodes " +
+                        "[allNodes=" + names + ']');
+            }
+            else
+                startTime = g0.context().discovery().gridStartTime();
+
             for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) {
                 CacheConfiguration cfg = c.context().config();