You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/01/19 12:05:43 UTC

[3/5] ignite git commit: IGNITE-4157 Use discovery custom messages instead of marshaller cache - Fixes #1271.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
new file mode 100644
index 0000000..1cff458
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.io.Serializable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *  Used to exchange mapping information on new mapping added or missing mapping requested flows.
+ *  See {@link GridMarshallerMappingProcessor} javadoc for more information.
+ */
+public final class MarshallerMappingItem implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final byte platformId;
+
+    /** */
+    private final int typeId;
+
+    /** */
+    private String clsName;
+
+    /**
+     * Class name may be null when instance is created to request missing mapping from cluster.
+     *
+     * @param platformId Platform id.
+     * @param typeId Type id.
+     * @param clsName Class name. May be null in case when the item is created to request missing mapping from grid.
+     */
+    public MarshallerMappingItem(byte platformId, int typeId, @Nullable String clsName) {
+        this.platformId = platformId;
+        this.typeId = typeId;
+        this.clsName = clsName;
+    }
+
+    /** */
+    public int typeId() {
+        return typeId;
+    }
+
+    /** */
+    public byte platformId() {
+        return platformId;
+    }
+
+    /** */
+    public String className() {
+        return clsName;
+    }
+
+    /**
+     * @param clsName Class name.
+     */
+    public void className(String clsName) {
+        this.clsName = clsName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (obj == this)
+            return true;
+
+        if (!(obj instanceof MarshallerMappingItem))
+            return false;
+
+        MarshallerMappingItem that = (MarshallerMappingItem) obj;
+
+        return platformId == that.platformId
+                && typeId == that.typeId
+                && (clsName != null ? clsName.equals(that.clsName) : that.clsName == null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return 31 * 31 * ((int) platformId) + 31 * typeId + (clsName != null ? clsName.hashCode() : 0) ;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "[platformId: " + platformId + ", typeId:" + typeId + ", clsName: " + clsName + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
new file mode 100644
index 0000000..b974882
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides capabilities of sending custom discovery events to propose new mapping
+ * or request missing mapping to {@link MarshallerContextImpl}.
+ *
+ * For more information about particular events see documentation of {@link GridMarshallerMappingProcessor}.
+ */
+public final class MarshallerMappingTransport {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final GridDiscoveryManager discoMgr;
+
+    /** */
+    private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchSyncMap;
+
+    /** */
+    private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap;
+
+    /** */
+    private volatile boolean stopping;
+
+    /**
+     * @param ctx Context.
+     * @param mappingExchSyncMap Mapping exch sync map.
+     * @param clientReqSyncMap Client request sync map.
+     */
+    MarshallerMappingTransport(
+            GridKernalContext ctx,
+            ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchSyncMap,
+            ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap
+    ) {
+        this.ctx = ctx;
+        discoMgr = ctx.discovery();
+        this.mappingExchSyncMap = mappingExchSyncMap;
+        this.clientReqSyncMap = clientReqSyncMap;
+
+        stopping = false;
+    }
+
+    /**
+     * @param item Item.
+     * @param cache Cache.
+     */
+    public GridFutureAdapter<MappingExchangeResult> awaitMappingAcceptance(
+            MarshallerMappingItem item, ConcurrentMap<Integer,
+            MappedName> cache
+    ) {
+        GridFutureAdapter<MappingExchangeResult> fut = new MappingExchangeResultFuture(item);
+
+        GridFutureAdapter<MappingExchangeResult> oldFut = mappingExchSyncMap.putIfAbsent(item, fut);
+
+        if (oldFut != null)
+            return oldFut;
+
+        MappedName mappedName = cache.get(item.typeId());
+
+        assert mappedName != null;
+
+        //double check whether mapping is accepted, first check was in MarshallerContextImpl::registerClassName
+        if (mappedName.accepted())
+            fut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
+
+        return fut;
+    }
+
+    /**
+     * @param item Item.
+     * @param cache Cache.
+     */
+    public GridFutureAdapter<MappingExchangeResult> proposeMapping(MarshallerMappingItem item, ConcurrentMap<Integer, MappedName> cache) throws IgniteCheckedException {
+        GridFutureAdapter<MappingExchangeResult> fut = new MappingExchangeResultFuture(item);
+
+        GridFutureAdapter<MappingExchangeResult> oldFut = mappingExchSyncMap.putIfAbsent(item, fut);
+
+        if (oldFut != null)
+            return oldFut;
+        else {
+            //double check, first check was in caller: MarshallerContextImpl::registerClassName
+            MappedName mapping = cache.get(item.typeId());
+
+            if (mapping != null) {
+                String mappedClsName = mapping.className();
+
+                if (!mappedClsName.equals(item.className()))
+                    fut.onDone(MappingExchangeResult.createFailureResult(duplicateMappingException(item, mappedClsName)));
+                else if (mapping.accepted())
+                    fut.onDone(MappingExchangeResult.createSuccessfulResult(mappedClsName));
+                else if (stopping)
+                    fut.onDone(MappingExchangeResult.createExchangeDisabledResult());
+
+                return fut;
+            }
+        }
+
+        DiscoveryCustomMessage msg = new MappingProposedMessage(item, discoMgr.localNode().id());
+        discoMgr.sendCustomEvent(msg);
+
+        return fut;
+    }
+
+    /**
+     * @param item Item.
+     * @param cache Cache.
+     */
+    public GridFutureAdapter<MappingExchangeResult> requestMapping(
+            MarshallerMappingItem item,
+            ConcurrentMap<Integer, MappedName> cache
+    ) {
+        ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap);
+
+        ClientRequestFuture oldFut = clientReqSyncMap.putIfAbsent(item, newFut);
+
+        if (oldFut != null)
+            return oldFut;
+
+        MappedName mappedName = cache.get(item.typeId());
+
+        if (mappedName != null) {
+            newFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
+
+            return newFut;
+        }
+
+        newFut.requestMapping();
+
+        return newFut;
+    }
+
+    /**
+     * @param item Item.
+     * @param mappedClsName Mapped class name.
+     */
+    private IgniteCheckedException duplicateMappingException(MarshallerMappingItem item, String mappedClsName) {
+        return new IgniteCheckedException("Duplicate ID [platformId="
+                + item.platformId()
+                + ", typeId="
+                + item.typeId()
+                + ", oldCls="
+                + mappedClsName
+                + ", newCls="
+                + item.className() + "]");
+    }
+
+    /** */
+    public void markStopping() {
+        stopping = true;
+    }
+
+    /** */
+    public boolean stopping() {
+        return stopping;
+    }
+
+    /**
+     * Future to wait for mapping exchange result to arrive. Removes itself from map when completed.
+     */
+    private class MappingExchangeResultFuture extends GridFutureAdapter<MappingExchangeResult> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final MarshallerMappingItem mappingItem;
+
+        /**
+         * @param mappingItem Mapping item.
+         */
+        private MappingExchangeResultFuture(MarshallerMappingItem mappingItem) {
+            this.mappingItem = mappingItem;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable MappingExchangeResult res, @Nullable Throwable err) {
+            assert res != null;
+
+            boolean done = super.onDone(res, null);
+
+            if (done)
+                mappingExchSyncMap.remove(mappingItem, this);
+
+            return done;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java
new file mode 100644
index 0000000..ba2afce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Client node receives discovery messages in asynchronous mode
+ * so it is possible that all server nodes already accepted new mapping but clients are unaware about it.
+ *
+ * In this case it is possible for client node to receive a request to perform some operation on such class
+ * client doesn't know about its mapping.
+ * Upon receiving such request client sends an explicit {@link MissingMappingRequestMessage} mapping request
+ * to one of server nodes using CommunicationSPI and waits for {@link MissingMappingResponseMessage} response.
+ *
+ * If server node where mapping request was sent to leaves the cluster for some reason
+ * mapping request gets automatically resent to the next alive server node in topology.
+ */
+public class MissingMappingRequestMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private byte platformId;
+
+    /** */
+    private int typeId;
+
+    /**
+     * Default constructor.
+     */
+    public MissingMappingRequestMessage() {
+        //No-op.
+    }
+
+    /**
+     * @param platformId Platform id.
+     * @param typeId Type id.
+     */
+    MissingMappingRequestMessage(byte platformId, int typeId) {
+        this.platformId = platformId;
+        this.typeId = typeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("platformId", platformId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("typeId", typeId))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                platformId = reader.readByte("platformId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                typeId = reader.readInt("typeId");
+
+                if (!reader.isLastRead())
+                    return false;
+        }
+
+        return reader.afterMessageRead(MissingMappingRequestMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 120;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** */
+    public byte platformId() {
+        return platformId;
+    }
+
+    /** */
+    public int typeId() {
+        return typeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MissingMappingRequestMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java
new file mode 100644
index 0000000..d468302
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * On receiving a {@link MissingMappingRequestMessage} mapping request server node looks up class name
+ * for requested platformId and typeId in its local marshaller cache and sends back
+ * a {@link MissingMappingResponseMessage} mapping response with resolved class name.
+ */
+public class MissingMappingResponseMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private byte platformId;
+
+    /** */
+    private int typeId;
+
+    /** */
+    private String clsName;
+
+    /**
+     * Default constructor.
+     */
+    public MissingMappingResponseMessage() {
+    }
+
+    /**
+     * @param platformId Platform id.
+     * @param typeId Type id.
+     * @param clsName Class name.
+     */
+    MissingMappingResponseMessage(byte platformId, int typeId, String clsName) {
+        this.platformId = platformId;
+        this.typeId = typeId;
+        this.clsName = clsName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("platformId", platformId))
+                    return false;
+
+                writer.incrementState();
+            case 1:
+                if (!writer.writeInt("typeId", typeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeString("clsName", clsName))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                platformId = reader.readByte("platformId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                typeId = reader.readInt("typeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                clsName = reader.readString("clsName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(MissingMappingResponseMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 121;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    public byte platformId() {
+        return platformId;
+    }
+
+    /**
+     *
+     */
+    public int typeId() {
+        return typeId;
+    }
+
+    /**
+     *
+     */
+    public String className() {
+        return clsName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MissingMappingResponseMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 526e222..1a45f7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -17,6 +17,22 @@
 
 package org.apache.ignite.internal.processors.platform.utils;
 
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.security.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -42,8 +58,8 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -51,23 +67,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.logger.NullLogger;
 import org.jetbrains.annotations.Nullable;
 
-import javax.cache.CacheException;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import java.math.BigDecimal;
-import java.security.Timestamp;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
 
 /**
@@ -844,25 +843,16 @@ public class PlatformUtils {
      */
     @SuppressWarnings("deprecation")
     public static GridBinaryMarshaller marshaller() {
-        try {
-            IgniteConfiguration cfg = new IgniteConfiguration();
-
-            BinaryContext ctx =
-                new BinaryContext(BinaryNoopMetadataHandler.instance(), cfg, new NullLogger());
-
-            BinaryMarshaller marsh = new BinaryMarshaller();
+        BinaryContext ctx =
+            new BinaryContext(BinaryNoopMetadataHandler.instance(), new IgniteConfiguration(), new NullLogger());
 
-            String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome());
+        BinaryMarshaller marsh = new BinaryMarshaller();
 
-            marsh.setContext(new MarshallerContextImpl(workDir, null));
+        marsh.setContext(new MarshallerContextImpl(null));
 
-            ctx.configure(marsh, new IgniteConfiguration());
+        ctx.configure(marsh, new IgniteConfiguration());
 
-            return new GridBinaryMarshaller(ctx);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
+        return new GridBinaryMarshaller(ctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
index 560d474..ffc1ded 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
@@ -39,8 +39,13 @@ import org.apache.ignite.plugin.Extension;
 import org.apache.ignite.plugin.ExtensionRegistry;
 import org.apache.ignite.plugin.PluginContext;
 import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.PLUGIN;
+
 /**
  *
  */
@@ -148,44 +153,87 @@ public class IgnitePluginProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.PLUGIN;
+        return PLUGIN;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId());
+
+        if (pluginsData != null)
+            dataBag.addJoiningNodeData(PLUGIN.ordinal(), pluginsData);
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
-        HashMap<String, Serializable> discData = null;
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId());
+
+        if (pluginsData != null)
+            dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsData);
+    }
+
+    /**
+     * @param joiningNodeId Joining node id.
+     */
+    private Serializable getDiscoveryData(UUID joiningNodeId) {
+        HashMap<String, Serializable> pluginsData = null;
 
         for (Map.Entry<String, PluginProvider> e : plugins.entrySet()) {
-            Serializable data = e.getValue().provideDiscoveryData(nodeId);
+            Serializable data = e.getValue().provideDiscoveryData(joiningNodeId);
 
             if (data != null) {
-                if (discData == null)
-                    discData = new HashMap<>();
+                if (pluginsData == null)
+                    pluginsData = new HashMap<>();
 
-                discData.put(e.getKey(), data);
+                pluginsData.put(e.getKey(), data);
             }
         }
 
-        return discData;
+        return pluginsData;
     }
 
     /** {@inheritDoc} */
-    @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable data) {
-        Map<String, Serializable> discData = (Map<String, Serializable>)data;
+    @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
+        if (data.hasJoiningNodeData()) {
+            Map<String, Serializable> pluginsData = (Map<String, Serializable>) data.joiningNodeData();
 
-        if (discData != null) {
-            for (Map.Entry<String, Serializable> e : discData.entrySet()) {
-                PluginProvider provider = plugins.get(e.getKey());
+            applyPluginsData(data.joiningNodeId(), pluginsData);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        Map<UUID, Serializable> nodeSpecificData = data.nodeSpecificData();
+
+        if (nodeSpecificData != null) {
+            UUID joiningNodeId = data.joiningNodeId();
+
+            for (Serializable v : nodeSpecificData.values()) {
+                if (v != null) {
+                    Map<String, Serializable> pluginsData = (Map<String, Serializable>) v;
 
-                if (provider != null)
-                    provider.receiveDiscoveryData(nodeId, e.getValue());
-                else
-                    U.warn(log, "Received discovery data for unknown plugin: " + e.getKey());
+                    applyPluginsData(joiningNodeId, pluginsData);
+                }
             }
         }
     }
 
     /**
+     * @param nodeId Node id.
+     * @param pluginsData Plugins data.
+     */
+    private void applyPluginsData(UUID nodeId, Map<String, Serializable> pluginsData) {
+        for (Map.Entry<String, Serializable> e : pluginsData.entrySet()) {
+            PluginProvider provider = plugins.get(e.getKey());
+
+            if (provider != null)
+                provider.receiveDiscoveryData(nodeId, e.getValue());
+            else
+                U.warn(log, "Received discovery data for unknown plugin: " + e.getKey());
+        }
+    }
+
+    /**
      * Print plugins information.
      */
     private void ackPluginsInfo() {
@@ -227,7 +275,7 @@ public class IgnitePluginProcessor extends GridProcessorAdapter {
         /**
          * @return Map extension interface to array of implementation.
          */
-        public Map<Class<?>, Object[]> createExtensionMap() {
+        Map<Class<?>, Object[]> createExtensionMap() {
             Map<Class<?>, Object[]> extensions = new HashMap<>(extensionsCollector.size() * 2, 0.5f);
 
             for (Map.Entry<Class<?>, List<Object>> entry : extensionsCollector.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 89140b9..f42815b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -118,11 +118,6 @@ public class PoolProcessor extends GridProcessorAdapter {
 
                 return ctx.utilityCachePool();
 
-            case GridIoPolicy.MARSH_CACHE_POOL:
-                assert ctx.marshallerCachePool() != null : "Marshaller cache pool is not configured.";
-
-                return ctx.marshallerCachePool();
-
             case GridIoPolicy.IGFS_POOL:
                 assert ctx.getIgfsExecutorService() != null : "IGFS pool is not configured.";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..986fff7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -69,6 +69,8 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
+import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -1580,12 +1582,15 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 if (evt instanceof DiscoveryCustomEvent) {
                     DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
 
-                    topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
-
                     if (msg instanceof CacheAffinityChangeMessage) {
                         if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
                             return;
                     }
+
+                    if (msg instanceof MappingProposedMessage || msg instanceof MappingAcceptedMessage)
+                        return;
+
+                    topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
                 }
                 else
                     topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
index 252ddd6..6898c36 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
@@ -26,23 +26,36 @@ public interface MarshallerContext {
     /**
      * Registers class with provided type ID.
      *
-     * @param id Type ID.
-     * @param cls Class.
+     * @param typeId Type ID.
+     * @param platformId id of a platform to register class.
+     * @param clsName class name.
      * @return Whether class was registered.
      * @throws IgniteCheckedException In case of error.
      */
-    public boolean registerClass(int id, Class cls) throws IgniteCheckedException;
+    public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException;
 
     /**
      * Gets class for provided type ID.
      *
-     * @param id Type ID.
+     * @param typeId Type ID.
      * @param ldr Class loader.
      * @return Class.
      * @throws ClassNotFoundException If class was not found.
      * @throws IgniteCheckedException In case of any other error.
      */
-    public Class getClass(int id, ClassLoader ldr) throws ClassNotFoundException, IgniteCheckedException;
+    public Class getClass(int typeId, ClassLoader ldr) throws ClassNotFoundException, IgniteCheckedException;
+
+
+    /**
+     * Gets class name for provided (platformId, typeId) pair.
+     *
+     * @param platformId id of a platform the class was registered for.
+     * @param typeId Type ID.
+     * @return Class name
+     * @throws ClassNotFoundException If class was not found.
+     * @throws IgniteCheckedException In case of any other error.
+     */
+    public String getClassName(byte platformId, int typeId) throws ClassNotFoundException, IgniteCheckedException;
 
     /**
      * Checks whether the given type is a system one - JDK class or Ignite class.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
index 923f385..60a627a 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.marshaller.MarshallerContext;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 
+import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID;
+
 /**
  * Miscellaneous utility methods to facilitate {@link OptimizedMarshaller}.
  */
@@ -159,7 +161,7 @@ class OptimizedMarshallerUtils {
         try {
             mapOff = GridUnsafe.objectFieldOffset(HashSet.class.getDeclaredField("map"));
         }
-        catch (NoSuchFieldException e) {
+        catch (NoSuchFieldException ignored) {
             try {
                 // Workaround for legacy IBM JRE.
                 mapOff = GridUnsafe.objectFieldOffset(HashSet.class.getDeclaredField("backingMap"));
@@ -203,7 +205,7 @@ class OptimizedMarshallerUtils {
             boolean registered;
 
             try {
-                registered = ctx.registerClass(typeId, cls);
+                registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName());
             }
             catch (IgniteCheckedException e) {
                 throw new IOException("Failed to register class: " + cls.getName(), e);
@@ -246,7 +248,7 @@ class OptimizedMarshallerUtils {
      * Gets descriptor for provided ID.
      *
      * @param clsMap Class descriptors by class map.
-     * @param id ID.
+     * @param typeId ID.
      * @param ldr Class loader.
      * @param ctx Context.
      * @param mapper ID mapper.
@@ -256,17 +258,17 @@ class OptimizedMarshallerUtils {
      */
     static OptimizedClassDescriptor classDescriptor(
         ConcurrentMap<Class, OptimizedClassDescriptor> clsMap,
-        int id,
+        int typeId,
         ClassLoader ldr,
         MarshallerContext ctx,
         OptimizedMarshallerIdMapper mapper) throws IOException, ClassNotFoundException {
         Class cls;
 
         try {
-            cls = ctx.getClass(id, ldr);
+            cls = ctx.getClass(typeId, ldr);
         }
         catch (IgniteCheckedException e) {
-            throw new IOException("Failed to resolve class for ID: " + id, e);
+            throw new IOException("Failed to resolve class for ID: " + typeId, e);
         }
 
         OptimizedClassDescriptor desc = clsMap.get(cls);
@@ -307,7 +309,7 @@ class OptimizedMarshallerUtils {
                     }
                 }
             }
-            catch (NoSuchFieldException e) {
+            catch (NoSuchFieldException ignored) {
                 // No-op.
             }
             catch (IllegalAccessException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
new file mode 100644
index 0000000..96df255
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi.discovery;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.GridComponent;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides interface for {@link GridComponent} to collect and exchange discovery data both on
+ * joining node and on cluster nodes.
+ *
+ * It only organizes interaction with components and doesn't provide any capabilities of converting collected data
+ * into formats eligible for transmitting over media (like marshalling, compressing and so on).
+ */
+public class DiscoveryDataBag {
+    /**
+     * Facade interface representing {@link DiscoveryDataBag} object with discovery data from joining node.
+     */
+    public interface JoiningNodeDiscoveryData {
+        /** */
+        UUID joiningNodeId();
+
+        /** */
+        boolean hasJoiningNodeData();
+
+        /** */
+        Serializable joiningNodeData();
+    }
+
+    /**
+     * Facade interface representing {@link DiscoveryDataBag} object with discovery data collected in the grid.
+     */
+    public interface GridDiscoveryData {
+        /** */
+        UUID joiningNodeId();
+
+        /** */
+        Serializable commonData();
+
+        /** */
+        Map<UUID, Serializable> nodeSpecificData();
+    }
+
+    /**
+     *
+     */
+    private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscoveryData {
+        /** */
+        private int cmpId;
+
+        /** {@inheritDoc} */
+        @Override public UUID joiningNodeId() {
+            return joiningNodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasJoiningNodeData() {
+            return joiningNodeData.containsKey(cmpId);
+        }
+
+        /** {@inheritDoc} */
+        @Override @Nullable public Serializable joiningNodeData() {
+            return joiningNodeData.get(cmpId);
+        }
+
+        /**
+         * @param cmpId Cmp id.
+         */
+        private void setComponentId(int cmpId) {
+            this.cmpId = cmpId;
+        }
+    }
+
+    /**
+     *
+     */
+    private final class GridDiscoveryDataImpl implements GridDiscoveryData {
+        /** */
+        private int cmpId;
+
+        /** */
+        private Map<UUID, Serializable> nodeSpecificData
+                = new LinkedHashMap<>(DiscoveryDataBag.this.nodeSpecificData.size());
+
+        /** {@inheritDoc} */
+        @Override public UUID joiningNodeId() {
+            return joiningNodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override @Nullable public Serializable commonData() {
+            if (commonData != null)
+                return commonData.get(cmpId);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<UUID, Serializable> nodeSpecificData() {
+            return nodeSpecificData;
+        }
+
+        /**
+         * @param cmpId component ID.
+         */
+        private void componentId(int cmpId) {
+            this.cmpId = cmpId;
+
+            reinitNodeSpecData(cmpId);
+        }
+
+        /**
+         * @param cmpId component ID.
+         */
+        private void reinitNodeSpecData(int cmpId) {
+            nodeSpecificData.clear();
+
+            for (Map.Entry<UUID, Map<Integer, Serializable>> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) {
+                if (e.getValue() != null && e.getValue().containsKey(cmpId))
+                    nodeSpecificData.put(e.getKey(), e.getValue().get(cmpId));
+            }
+        }
+    }
+
+    /** Used for collecting node-specific data from component.
+     * As component may not know about nodeId it is running on, when component adds node-specific data,
+     * it is firstly collected under this key and then moved to another map with a correct UUID key.
+     */
+    private static final UUID DEFAULT_KEY = null;
+
+    /** */
+    private UUID joiningNodeId;
+
+    /**
+     * Component IDs with already initialized common discovery data.
+     */
+    private Set<Integer> cmnDataInitializedCmps;
+
+    /** */
+    private Map<Integer, Serializable> joiningNodeData = new HashMap<>();
+
+    /** */
+    private Map<Integer, Serializable> commonData = new HashMap<>();
+
+    /** */
+    private Map<UUID, Map<Integer, Serializable>> nodeSpecificData = new LinkedHashMap<>();
+
+    /** */
+    private JoiningNodeDiscoveryDataImpl newJoinerData;
+
+    /** */
+    private GridDiscoveryDataImpl gridData;
+
+    /**
+     * @param joiningNodeId Joining node id.
+     */
+    public DiscoveryDataBag(UUID joiningNodeId) {
+        this.joiningNodeId = joiningNodeId;
+    }
+
+    /**
+     * @param joiningNodeId Joining node id.
+     * @param cmnDataInitializedCmps Component IDs with already initialized common discovery data.
+     */
+    public DiscoveryDataBag(UUID joiningNodeId, Set<Integer> cmnDataInitializedCmps) {
+        this.joiningNodeId = joiningNodeId;
+        this.cmnDataInitializedCmps = cmnDataInitializedCmps;
+    }
+
+    /**
+     *
+     */
+    public UUID joiningNodeId() {
+        return joiningNodeId;
+    }
+
+    /**
+     * @param cmpId component ID.
+     */
+    public GridDiscoveryData gridDiscoveryData(int cmpId) {
+        if (gridData == null)
+            gridData = new GridDiscoveryDataImpl();
+
+        gridData.componentId(cmpId);
+
+        return gridData;
+    }
+
+    /**
+     * @param cmpId component ID.
+     */
+    public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) {
+        if (newJoinerData == null)
+            newJoinerData = new JoiningNodeDiscoveryDataImpl();
+
+        newJoinerData.setComponentId(cmpId);
+
+        return newJoinerData;
+    }
+
+    /**
+     * @param cmpId component ID.
+     * @param data Data.
+     */
+    public void addJoiningNodeData(Integer cmpId, Serializable data) {
+        joiningNodeData.put(cmpId, data);
+    }
+
+    /**
+     * @param cmpId component ID.
+     * @param data Data.
+     */
+    public void addGridCommonData(Integer cmpId, Serializable data) {
+        commonData.put(cmpId, data);
+    }
+
+    /**
+     * @param cmpId component ID.
+     * @param data Data.
+     */
+    public void addNodeSpecificData(Integer cmpId, Serializable data) {
+        if (!nodeSpecificData.containsKey(DEFAULT_KEY))
+            nodeSpecificData.put(DEFAULT_KEY, new HashMap<Integer, Serializable>());
+
+        nodeSpecificData.get(DEFAULT_KEY).put(cmpId, data);
+    }
+
+    /**
+     * @param cmpId component ID.
+     */
+    public boolean commonDataCollectedFor(Integer cmpId) {
+        assert cmnDataInitializedCmps != null;
+
+        return cmnDataInitializedCmps.contains(cmpId);
+    }
+
+    /**
+     * @param joinNodeData Joining node data.
+     */
+    public void joiningNodeData(Map<Integer, Serializable> joinNodeData) {
+        joiningNodeData.putAll(joinNodeData);
+    }
+
+    /**
+     * @param cmnData Cmn data.
+     */
+    public void commonData(Map<Integer, Serializable> cmnData) {
+        commonData.putAll(cmnData);
+    }
+
+    /**
+     * @param nodeSpecData Node specific data.
+     */
+    public void nodeSpecificData(Map<UUID, Map<Integer, Serializable>> nodeSpecData) {
+        nodeSpecificData.putAll(nodeSpecData);
+    }
+
+    /**
+     *
+     */
+    public Map<Integer, Serializable> joiningNodeData() {
+        return joiningNodeData;
+    }
+
+    /**
+     *
+     */
+    public Map<Integer, Serializable> commonData() {
+        return commonData;
+    }
+
+    /**
+     *
+     */
+    @Nullable public Map<Integer, Serializable> localNodeSpecificData() {
+        return nodeSpecificData.get(DEFAULT_KEY);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
index e591470..d56c943 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -17,10 +17,6 @@
 
 package org.apache.ignite.spi.discovery;
 
-import java.io.Serializable;
-import java.util.Map;
-import java.util.UUID;
-
 /**
  * Handler for initial data exchange between Ignite nodes. Data exchange
  * is initiated by a new node when it tries to join topology and finishes
@@ -32,17 +28,14 @@ public interface DiscoverySpiDataExchange {
      * on new node that joins topology to transfer its data to existing
      * nodes and on all existing nodes to transfer their data to new node.
      *
-     * @param joiningNodeId ID of new node that joins topology.
-     * @return Collection of discovery data objects from different components.
+     * @param dataBag {@link DiscoveryDataBag} object managing discovery data during node joining process.
      */
-    public Map<Integer, Serializable> collect(UUID joiningNodeId);
+    public DiscoveryDataBag collect(DiscoveryDataBag dataBag);
 
     /**
      * Notifies discovery manager about data received from remote node.
      *
-     * @param joiningNodeId ID of new node that joins topology.
-     * @param nodeId ID of the node provided data.
-     * @param data Collection of discovery data objects from different components.
+     * @param dataBag Collection of discovery data objects from different components.
      */
-    public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data);
+    public void onExchange(DiscoveryDataBag dataBag);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 8928f28..a5cedf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,6 +70,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
 import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
@@ -623,7 +624,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                     if (locNode.order() > 0)
                         node = locNode.clientReconnectNode();
 
-                    msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+                    msg = new TcpDiscoveryJoinRequestMessage(
+                            node,
+                            spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
                 }
                 else
                     msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
@@ -1048,7 +1051,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            TcpDiscoveryAbstractMessage msg;
+            TcpDiscoveryAbstractMessage msg = null;
 
             while (!Thread.currentThread().isInterrupted()) {
                 Socket sock;
@@ -1062,7 +1065,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    msg = queue.poll();
+                    if (msg == null)
+                        msg = queue.poll();
 
                     if (msg == null) {
                         mux.wait();
@@ -1661,11 +1665,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (log.isDebugEnabled())
                             log.debug("Added new node to topology: " + node);
 
-                        Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+                        DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
 
-                        if (data != null)
-                            spi.onExchange(newNodeId, newNodeId, data,
-                                U.resolveClassLoader(spi.ignite().configuration()));
+                        if (dataPacket != null && dataPacket.hasJoiningNodeData())
+                            spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
                     }
                 }
                 else {
@@ -1684,13 +1687,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             if (getLocalNodeId().equals(msg.nodeId())) {
                 if (joining()) {
-                    Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
+                    DiscoveryDataPacket dataContainer = msg.clientDiscoData();
 
-                    if (dataMap != null) {
-                        for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
-                            spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(),
-                                U.resolveClassLoader(spi.ignite().configuration()));
-                    }
+                    if (dataContainer != null)
+                        spi.onExchange(dataContainer, U.resolveClassLoader(spi.ignite().configuration()));
 
                     locNode.setAttributes(msg.clientNodeAttributes());
                     locNode.visible(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7cc35e4..df782e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -100,6 +100,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
 import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState;
@@ -964,7 +965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     @SuppressWarnings({"BusyWait"})
     private boolean sendJoinRequestMessage() throws IgniteSpiException {
         TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
-            spi.collectExchangeData(getLocalNodeId()));
+            spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
 
         // Time when it has been detected, that addresses from IP finder do not respond.
         long noResStart = 0;
@@ -1933,7 +1934,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 // Do not need this data for client reconnect.
-                addedMsg.oldNodesDiscoveryData(null);
+                if (addedMsg.gridDiscoveryData() != null)
+                    addedMsg.clearDiscoveryData();
             }
             else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
                 TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
@@ -1943,44 +1945,25 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     msg = addFinishMsg;
 
-                    Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
+                    DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData();
 
-                    Set<UUID> replaced = null;
+                    Set<Integer> mrgdCmnData = new HashSet<>();
+                    Set<UUID> mrgdSpecData = new HashSet<>();
+
+                    boolean allMerged = false;
 
                     for (TcpDiscoveryAbstractMessage msg0 : msgs) {
+
                         if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) {
-                            Map<UUID, Map<Integer, byte[]>> existingDiscoData =
+                            DiscoveryDataPacket existingDiscoData =
                                 ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData();
 
-                            // Check if already stored message contains the same data to do not store copies multiple times.
-                            if (existingDiscoData != null) {
-                                for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) {
-                                    UUID nodeId = e.getKey();
-
-                                    if (F.contains(replaced, nodeId))
-                                        continue;
-
-                                    Map<Integer, byte[]> existingData = existingDiscoData.get(e.getKey());
-
-                                    if (existingData != null && mapsEqual(e.getValue(), existingData)) {
-                                        e.setValue(existingData);
-
-                                        if (replaced == null)
-                                            replaced = new HashSet<>();
-
-                                        boolean add = replaced.add(nodeId);
-
-                                        assert add;
-
-                                        if (replaced.size() == discoData.size())
-                                            break;
-                                    }
-                                }
-
-                                if (replaced != null && replaced.size() == discoData.size())
-                                    break;
-                            }
+                            if (existingDiscoData != null)
+                                allMerged = discoData.mergeDataFrom(existingDiscoData, mrgdCmnData, mrgdSpecData);
                         }
+
+                        if (allMerged)
+                            break;
                     }
                 }
             }
@@ -2011,29 +1994,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * @param m1 Map 1.
-         * @param m2 Map 2.
-         * @return {@code True} if maps contain the same data.
-         */
-        private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
-            if (m1 == m2)
-                return true;
-
-            if (m1.size() == m2.size()) {
-                for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
-                    byte[] data = m2.get(e.getKey());
-
-                    if (!Arrays.equals(e.getValue(), data))
-                        return false;
-                }
-
-                return true;
-            }
-
-            return false;
-        }
-
-        /**
          * Gets messages starting from provided ID (exclusive). If such
          * message is not found, {@code null} is returned (this indicates
          * a failure condition when it was already removed from queue).
@@ -3748,8 +3708,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (log.isDebugEnabled())
                     log.debug("Internal order has been assigned to node: " + node);
 
+                DiscoveryDataPacket data = msg.gridDiscoveryData();
+
                 TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
-                    node, msg.discoveryData(), spi.gridStartTime);
+                    node, data, spi.gridStartTime);
 
                 nodeAddedMsg.client(msg.client());
 
@@ -3970,7 +3932,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         node.id());
 
                     if (node.isClient()) {
-                        addFinishMsg.clientDiscoData(msg.oldNodesDiscoveryData());
+                        addFinishMsg.clientDiscoData(msg.gridDiscoveryData());
 
                         addFinishMsg.clientNodeAttributes(node.attributes());
                     }
@@ -4100,12 +4062,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (topChanged) {
                     assert !node.visible() : "Added visible node [node=" + node + ", locNode=" + locNode + ']';
 
-                    Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+                    DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
 
-                    if (data != null)
-                        spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()));
+                    if (dataPacket.hasJoiningNodeData())
+                        spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
 
-                    msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
+                    spi.collectExchangeData(dataPacket);
 
                     processMessageFailedNodes(msg);
                 }
@@ -4116,8 +4078,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified() && locNodeId.equals(node.id())) {
-                // Discovery data.
-                Map<UUID, Map<Integer, byte[]>> dataMap;
+                DiscoveryDataPacket dataPacket;
 
                 synchronized (mux) {
                     if (spiState == CONNECTING && locNode.internalOrder() != node.internalOrder()) {
@@ -4194,7 +4155,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             if (log.isDebugEnabled())
                                 log.debug("Restored topology from node added message: " + ring);
 
-                            dataMap = msg.oldNodesDiscoveryData();
+                            dataPacket = msg.gridDiscoveryData();
 
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
@@ -4227,11 +4188,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 // Notify outside of synchronized block.
-                if (dataMap != null) {
-                    for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
-                        spi.onExchange(node.id(), entry.getKey(), entry.getValue(),
-                            U.resolveClassLoader(spi.ignite().configuration()));
-                }
+                if (dataPacket != null)
+                    spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
 
                 processMessageFailedNodes(msg);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index db85cc4..1f2f328 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -22,7 +22,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -51,7 +50,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -77,6 +75,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.IgniteSpiVersionCheckException;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
@@ -85,6 +84,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -1679,66 +1679,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
-     * @param nodeId Node ID.
-     * @return Marshalled exchange data.
+     * @param dataPacket Data packet.
      */
-    protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) {
+    DiscoveryDataPacket collectExchangeData(DiscoveryDataPacket dataPacket) {
         if (locNode.isDaemon())
-            return Collections.emptyMap();
+            return dataPacket;
 
-        Map<Integer, Serializable> data = exchange.collect(nodeId);
+        assert dataPacket != null;
+        assert dataPacket.joiningNodeId() != null;
 
-        assert data != null;
-
-        Map<Integer, byte[]> data0 = U.newHashMap(data.size());
+        //create data bag, pass it to exchange.collect
+        DiscoveryDataBag dataBag = dataPacket.bagForDataCollection();
 
-        for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
-            try {
-                byte[] bytes = U.marshal(marshaller(), entry.getValue());
+        exchange.collect(dataBag);
 
-                data0.put(entry.getKey(), bytes);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to marshal discovery data " +
-                    "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
-            }
-        }
+        //marshall collected bag into packet, return packet
+        if (dataPacket.joiningNodeId().equals(locNode.id()))
+            dataPacket.marshalJoiningNodeData(dataBag, marshaller(), log);
+        else
+            dataPacket.marshalGridNodeData(dataBag, locNode.id(), marshaller(), log);
 
-        return data0;
+        return dataPacket;
     }
 
     /**
-     * @param joiningNodeID Joining node ID.
-     * @param nodeId Remote node ID for which data is provided.
-     * @param data Collection of marshalled discovery data objects from different components.
+     * @param dataPacket object holding discovery data collected during discovery process.
      * @param clsLdr Class loader.
      */
-    protected void onExchange(UUID joiningNodeID,
-        UUID nodeId,
-        Map<Integer, byte[]> data,
-        ClassLoader clsLdr)
-    {
+    protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) {
         if (locNode.isDaemon())
             return;
 
-        Map<Integer, Serializable> data0 = U.newHashMap(data.size());
+        assert dataPacket != null;
+        assert dataPacket.joiningNodeId() != null;
 
-        for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
-            try {
-                Serializable compData = U.unmarshal(marshaller(), entry.getValue(), clsLdr);
+        DiscoveryDataBag dataBag;
+
+        if (dataPacket.joiningNodeId().equals(locNode.id()))
+            dataBag = dataPacket.unmarshalGridData(marshaller(), clsLdr, locNode.isClient(), log);
+        else
+            dataBag = dataPacket.unmarshalJoiningNodeData(marshaller(), clsLdr, locNode.isClient(), log);
 
-                data0.put(entry.getKey(), compData);
-            }
-            catch (IgniteCheckedException e) {
-                if (GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC.ordinal() == entry.getKey() &&
-                    X.hasCause(e, ClassNotFoundException.class) && locNode.isClient())
-                    U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored.");
-                else
-                    U.error(log, "Failed to unmarshal discovery data for component: "  + entry.getKey(), e);
-            }
-        }
 
-        exchange.onExchange(joiningNodeID, nodeId, data0);
+        exchange.onExchange(dataBag);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
new file mode 100644
index 0000000..1134de6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
+
+/**
+ * Carries discovery data in marshalled form
+ * and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects.
+ */
+public class DiscoveryDataPacket implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final UUID joiningNodeId;
+
+    /** */
+    private Map<Integer, byte[]> joiningNodeData = new HashMap<>();
+
+    /** */
+    private Map<Integer, byte[]> commonData = new HashMap<>();
+
+    /** */
+    private Map<UUID, Map<Integer, byte[]>> nodeSpecificData = new LinkedHashMap<>();
+
+    /**
+     * @param joiningNodeId Joining node id.
+     */
+    public DiscoveryDataPacket(UUID joiningNodeId) {
+        this.joiningNodeId = joiningNodeId;
+    }
+
+    /**
+     *
+     */
+    public UUID joiningNodeId() {
+        return joiningNodeId;
+    }
+
+    /**
+     * @param bag Bag.
+     * @param nodeId Node id.
+     * @param marsh Marsh.
+     * @param log Logger.
+     */
+    public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller marsh, IgniteLogger log) {
+        marshalData(bag.commonData(), commonData, marsh, log);
+
+        Map<Integer, Serializable> locNodeSpecificData = bag.localNodeSpecificData();
+
+        if (locNodeSpecificData != null) {
+            Map<Integer, byte[]> marshLocNodeSpecificData = U.newHashMap(locNodeSpecificData.size());
+
+            marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh, log);
+
+            filterDuplicatedData(marshLocNodeSpecificData);
+
+            if (!marshLocNodeSpecificData.isEmpty())
+                nodeSpecificData.put(nodeId, marshLocNodeSpecificData);
+        }
+    }
+
+    /**
+     * @param bag Bag.
+     * @param marsh Marsh.
+     * @param log Logger.
+     */
+    public void marshalJoiningNodeData(DiscoveryDataBag bag, Marshaller marsh, IgniteLogger log) {
+        marshalData(bag.joiningNodeData(), joiningNodeData, marsh, log);
+    }
+
+    /**
+     * @param marsh Marsh.
+     * @param clsLdr Class loader.
+     * @param clientNode Client node.
+     * @param log Logger.
+     */
+    public DiscoveryDataBag unmarshalGridData(
+            Marshaller marsh,
+            ClassLoader clsLdr,
+            boolean clientNode,
+            IgniteLogger log
+    ) {
+        DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId);
+
+        if (commonData != null && !commonData.isEmpty()) {
+            Map<Integer, Serializable> unmarshCommonData = unmarshalData(commonData, marsh, clsLdr, clientNode, log);
+
+            dataBag.commonData(unmarshCommonData);
+        }
+
+        if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) {
+            Map<UUID, Map<Integer, Serializable>> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size());
+
+            for (Map.Entry<UUID, Map<Integer, byte[]>> nodeBinEntry : nodeSpecificData.entrySet()) {
+                Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue();
+
+                if (nodeBinData == null || nodeBinData.isEmpty())
+                    continue;
+
+                Map<Integer, Serializable> unmarshData = unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log);
+
+                unmarshNodeSpecData.put(nodeBinEntry.getKey(), unmarshData);
+            }
+
+            dataBag.nodeSpecificData(unmarshNodeSpecData);
+        }
+
+        return dataBag;
+    }
+
+    /**
+     * @param marsh Marsh.
+     * @param clsLdr Class loader.
+     * @param clientNode Client node.
+     * @param log Logger.
+     */
+    public DiscoveryDataBag unmarshalJoiningNodeData(
+            Marshaller marsh,
+            ClassLoader clsLdr,
+            boolean clientNode,
+            IgniteLogger log
+    ) {
+        DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId);
+
+        if (joiningNodeData != null && !joiningNodeData.isEmpty()) {
+            Map<Integer, Serializable> unmarshJoiningNodeData = unmarshalData(
+                    joiningNodeData,
+                    marsh,
+                    clsLdr,
+                    clientNode,
+                    log);
+
+            dataBag.joiningNodeData(unmarshJoiningNodeData);
+        }
+
+        return dataBag;
+    }
+
+    /**
+     *
+     */
+    public boolean hasJoiningNodeData() {
+        return joiningNodeData != null && !joiningNodeData.isEmpty();
+    }
+
+    /**
+     * @param nodeId Node id.
+     */
+    public boolean hasDataFromNode(UUID nodeId) {
+        return nodeSpecificData.containsKey(nodeId);
+    }
+
+    /**
+     * @param existingDataPacket Existing data packet.
+     * @param mrgdCmnDataKeys Mrgd cmn data keys.
+     * @param mrgdSpecifDataKeys Mrgd specif data keys.
+     */
+    public boolean mergeDataFrom(
+            DiscoveryDataPacket existingDataPacket,
+            Collection<Integer> mrgdCmnDataKeys,
+            Collection<UUID> mrgdSpecifDataKeys
+    ) {
+        if (commonData.size() != mrgdCmnDataKeys.size()) {
+            for (Map.Entry<Integer, byte[]> e : commonData.entrySet()) {
+                if (!mrgdCmnDataKeys.contains(e.getKey())) {
+                    byte[] data = existingDataPacket.commonData.get(e.getKey());
+
+                    if (data != null && Arrays.equals(e.getValue(), data)) {
+                        e.setValue(data);
+
+                        boolean add = mrgdCmnDataKeys.add(e.getKey());
+
+                        assert add;
+
+                        if (mrgdCmnDataKeys.size() == commonData.size())
+                            break;
+                    }
+                }
+            }
+        }
+
+        if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) {
+            for (Map.Entry<UUID, Map<Integer, byte[]>> e : nodeSpecificData.entrySet()) {
+                if (!mrgdSpecifDataKeys.contains(e.getKey())) {
+                    Map<Integer, byte[]> data = existingDataPacket.nodeSpecificData.get(e.getKey());
+
+                    if (data != null && mapsEqual(e.getValue(), data)) {
+                        e.setValue(data);
+
+                        boolean add = mrgdSpecifDataKeys.add(e.getKey());
+
+                        assert add;
+
+                        if (mrgdSpecifDataKeys.size() == nodeSpecificData.size())
+                            break;
+                    }
+                }
+            }
+        }
+
+        return (mrgdCmnDataKeys.size() == commonData.size()) && (mrgdSpecifDataKeys.size() == nodeSpecificData.size());
+    }
+
+    /**
+     * @param m1 first map to compare.
+     * @param m2 second map to compare.
+     */
+    private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
+        if (m1 == m2)
+            return true;
+
+        if (m1.size() == m2.size()) {
+            for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
+                byte[] data = m2.get(e.getKey());
+
+                if (!Arrays.equals(e.getValue(), data))
+                    return false;
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param src Source.
+     * @param marsh Marsh.
+     * @param clsLdr Class loader.
+     * @param log Logger.
+     */
+    private Map<Integer, Serializable> unmarshalData(
+            Map<Integer, byte[]> src,
+            Marshaller marsh,
+            ClassLoader clsLdr,
+            boolean clientNode,
+            IgniteLogger log
+    ) {
+        Map<Integer, Serializable> res = U.newHashMap(src.size());
+
+        for (Map.Entry<Integer, byte[]> binEntry : src.entrySet()) {
+            try {
+                Serializable compData = marsh.unmarshal(binEntry.getValue(), clsLdr);
+                res.put(binEntry.getKey(), compData);
+            }
+            catch (IgniteCheckedException e) {
+                if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() &&
+                        X.hasCause(e, ClassNotFoundException.class) && clientNode)
+                    U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored.");
+                else
+                    U.error(log, "Failed to unmarshal discovery data for component: "  + binEntry.getKey(), e);
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * @param src Source.
+     * @param target Target.
+     * @param marsh Marsh.
+     * @param log Logger.
+     */
+    private void marshalData(
+            Map<Integer, Serializable> src,
+            Map<Integer, byte[]> target,
+            Marshaller marsh,
+            IgniteLogger log
+    ) {
+        //may happen if nothing was collected from components,
+        // corresponding map (for common data or for node specific data) left null
+        if (src == null)
+            return;
+
+        for (Map.Entry<Integer, Serializable> entry : src.entrySet()) {
+            try {
+                target.put(entry.getKey(), marsh.marshal(entry.getValue()));
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to marshal discovery data " +
+                        "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
+            }
+        }
+    }
+
+    /**
+     * TODO https://issues.apache.org/jira/browse/IGNITE-4435
+     */
+    private void filterDuplicatedData(Map<Integer, byte[]> discoData) {
+        for (Map<Integer, byte[]> existingData : nodeSpecificData.values()) {
+            Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
+
+            while (it.hasNext()) {
+                Map.Entry<Integer, byte[]> discoDataEntry = it.next();
+
+                byte[] curData = existingData.get(discoDataEntry.getKey());
+
+                if (Arrays.equals(curData, discoDataEntry.getValue()))
+                    it.remove();
+            }
+
+            if (discoData.isEmpty())
+                break;
+        }
+    }
+
+    /**
+     * Returns {@link DiscoveryDataBag} aware of components with already initialized common data
+     * (e.g. on nodes prior in cluster to the one where this method is called).
+     */
+    public DiscoveryDataBag bagForDataCollection() {
+        return new DiscoveryDataBag(joiningNodeId, commonData.keySet());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 22ffae8..24e3868 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
-import java.util.Map;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 
 /**
@@ -33,20 +33,20 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
     /** New node that wants to join the topology. */
     private final TcpDiscoveryNode node;
 
-    /** Discovery data. */
-    private final Map<Integer, byte[]> discoData;
+    /** Discovery data container. */
+    private final DiscoveryDataPacket dataPacket;
 
     /**
      * Constructor.
      *
      * @param node New node that wants to join.
-     * @param discoData Discovery data.
+     * @param dataPacket Discovery data.
      */
-    public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, Map<Integer, byte[]> discoData) {
+    public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, DiscoveryDataPacket dataPacket) {
         super(node.id());
 
         this.node = node;
-        this.discoData = discoData;
+        this.dataPacket = dataPacket;
     }
 
     /**
@@ -59,10 +59,10 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
     }
 
     /**
-     * @return Discovery data.
+     *
      */
-    public Map<Integer, byte[]> discoveryData() {
-        return discoData;
+    public DiscoveryDataPacket gridDiscoveryData() {
+        return dataPacket;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 80f4565..2c710b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -21,7 +21,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
 
 /**
  * Sent by coordinator across the ring to finish node add process.
@@ -40,7 +40,7 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
      * TcpDiscoveryNodeAddFinishedMessage
      */
     @GridToStringExclude
-    private Map<UUID, Map<Integer, byte[]>> clientDiscoData;
+    private DiscoveryDataPacket clientDiscoData;
 
     /** */
     @GridToStringExclude
@@ -81,17 +81,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     /**
      * @return Discovery data for joined client.
      */
-    public Map<UUID, Map<Integer, byte[]>> clientDiscoData() {
+    public DiscoveryDataPacket clientDiscoData() {
         return clientDiscoData;
     }
 
     /**
      * @param clientDiscoData Discovery data for joined client.
      */
-    public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) {
+    public void clientDiscoData(DiscoveryDataPacket clientDiscoData) {
         this.clientDiscoData = clientDiscoData;
 
-        assert clientDiscoData == null || !clientDiscoData.containsKey(nodeId);
+        assert clientDiscoData == null || !clientDiscoData.hasDataFromNode(nodeId);
     }
 
     /**