You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/28 12:56:08 UTC

[1/2] ignite git commit: ignite-4296-1

Repository: ignite
Updated Branches:
  refs/heads/ignite-4296-1 39532a3ee -> 660f2a38b


ignite-4296-1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/efe38d45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/efe38d45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/efe38d45

Branch: refs/heads/ignite-4296-1
Commit: efe38d45fc5b70671ecb11c2310c95773bdb7385
Parents: 39532a3
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 28 14:55:26 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 28 14:55:26 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spi/discovery/tcp/ServerImpl.java   | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/efe38d45/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8927a32..8407e4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3759,6 +3759,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (log.isDebugEnabled())
                     log.debug("Internal order has been assigned to node: " + node);
 
+                log.info("Created TcpDiscoveryNodeAddedMessage [node=" + node.id() + ']');
+
                 TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
                     node, msg.discoveryData(), spi.gridStartTime);
 
@@ -4047,7 +4049,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     Long time = spi.stats.onRingMessageReceived(msg);
 
                     if (time != null)
-                        log.info("TcpDiscoveryNodeAddedMessage ring time: " + time);
+                        log.info("TcpDiscoveryNodeAddedMessage ring [time=" + time + ", node=" + node.id() + ']');
 
                     TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId,
                         node.id());
@@ -4069,6 +4071,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 msg.verify(locNodeId);
+
+                log.info("Process TcpDiscoveryNodeAddedMessage [node=" + node.id() + ']');
             }
             else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) {
                 // Local node already has node from message in local topology.
@@ -4359,7 +4363,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     Long time = spi.stats.onRingMessageReceived(msg);
 
                     if (time != null)
-                        log.info("TcpDiscoveryNodeAddFinishedMessage ring time: " + time);
+                        log.info("TcpDiscoveryNodeAddFinishedMessage ring [time=" + time + ", node=" + node.id() + ']');
 
                     addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
@@ -5249,7 +5253,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     Long time = spi.stats.onRingMessageReceived(msg);
 
                     if (time != null)
-                        log.info("TcpDiscoveryCustomEventMessage ring time: " + time);
+                        log.info("TcpDiscoveryCustomEventMessage ring [time=" + time + ", msg=" + msg + ']');
 
                     DiscoverySpiCustomMessage msgObj = null;
 


[2/2] ignite git commit: ignite-4296-1

Posted by sb...@apache.org.
ignite-4296-1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/660f2a38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/660f2a38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/660f2a38

Branch: refs/heads/ignite-4296-1
Commit: 660f2a38b4af5fa81117713c7ddd06cdb6b0781c
Parents: efe38d4
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 28 15:55:34 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 28 15:55:34 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 45 +++++++++--
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 84 +++++++++++++++++---
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 12 ++-
 .../TcpDiscoveryCustomEventMessage.java         |  6 +-
 .../TcpDiscoveryJoinRequestMessage.java         | 12 ++-
 .../messages/TcpDiscoveryNodeAddedMessage.java  | 10 +++
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  2 +-
 7 files changed, 145 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 7b8c029..40a5cf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -430,8 +430,12 @@ class ClientImpl extends TcpDiscoveryImpl {
             throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
 
         try {
-            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
-                U.marshal(spi.marshaller(), evt)));
+            byte[] bytes = U.marshal(spi.marshaller(), evt);
+
+            if (TcpDiscoverySpi.COMPRESS_MSGS)
+                bytes = U.zip(bytes);
+
+            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, bytes));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -624,7 +628,22 @@ class ClientImpl extends TcpDiscoveryImpl {
                     if (locNode.order() > 0)
                         node = locNode.clientReconnectNode();
 
-                    msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+                    Map<Integer, byte[]> discoData = spi.collectExchangeData(getLocalNodeId());
+                    byte[] discoDataBytes = null;
+
+                    if (TcpDiscoverySpi.COMPRESS_MSGS) {
+                        try {
+                            log.info("Compress join request data.");
+
+                            discoDataBytes = U.zip(spi.marshaller().marshal(discoData));
+                            discoData = null;
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteSpiException(e);
+                        }
+                    }
+
+                    msg = new TcpDiscoveryJoinRequestMessage(node, discoData, discoDataBytes);
                 }
                 else
                     msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
@@ -1664,9 +1683,18 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
+                        try {
+                            if (data == null && msg.newNodeDiscoveryDataBytes() != null)
+                                data = U.unmarshalZip(spi.marshaller(), msg.newNodeDiscoveryDataBytes(), U.resolveClassLoader(spi.ignite().configuration()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to unmarshall discovery data", e);
+                        }
+
                         if (data != null)
                             spi.onExchange(newNodeId, newNodeId, data,
-                                U.resolveClassLoader(spi.ignite().configuration()));
+                                U.resolveClassLoader(spi.ignite().configuration()),
+                                false);
                     }
                 }
                 else {
@@ -1689,8 +1717,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     if (dataMap != null) {
                         for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
-                            spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(),
-                                U.resolveClassLoader(spi.ignite().configuration()));
+                            spi.onExchange(getLocalNodeId(),
+                                entry.getKey(),
+                                entry.getValue(),
+                                U.resolveClassLoader(spi.ignite().configuration()),
+                                TcpDiscoverySpi.COMPRESS_MSGS);
                     }
 
                     locNode.setAttributes(msg.clientNodeAttributes());
@@ -1994,7 +2025,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     if (node != null && node.visible()) {
                         try {
                             DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
-                                U.resolveClassLoader(spi.ignite().configuration()));
+                                U.resolveClassLoader(spi.ignite().configuration()), TcpDiscoverySpi.COMPRESS_MSGS);
 
                             notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8407e4f..b1643ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -760,8 +760,13 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
         try {
+            byte[] bytes = U.marshal(spi.marshaller(), evt);
+
+            if (TcpDiscoverySpi.COMPRESS_MSGS)
+                bytes = U.zip(bytes);
+
             msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
-                U.marshal(spi.marshaller(), evt)));
+                bytes));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -970,8 +975,24 @@ class ServerImpl extends TcpDiscoveryImpl {
      */
     @SuppressWarnings({"BusyWait"})
     private boolean sendJoinRequestMessage() throws IgniteSpiException {
+        Map<Integer, byte[]> discoData = spi.collectExchangeData(getLocalNodeId());
+        byte[] discoDataBytes = null;
+
+        if (TcpDiscoverySpi.COMPRESS_MSGS) {
+            try {
+                log.info("Compress join request data.");
+
+                discoDataBytes = U.zip(spi.marshaller().marshal(discoData));
+                discoData = null;
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSpiException(e);
+            }
+        }
+
         TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
-            spi.collectExchangeData(getLocalNodeId()));
+            discoData,
+            discoDataBytes);
 
         // Time when it has been detected, that addresses from IP finder do not respond.
         long noResStart = 0;
@@ -1543,6 +1564,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 nodeAddedMsg.topologyHistory(hist);
             }
+            else
+                clearNodeAddedMessage(nodeAddedMsg);
         }
     }
 
@@ -3762,7 +3785,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.info("Created TcpDiscoveryNodeAddedMessage [node=" + node.id() + ']');
 
                 TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
-                    node, msg.discoveryData(), spi.gridStartTime);
+                    node,
+                    msg.discoveryData(),
+                    msg.discoveryDataBytes(),
+                    spi.gridStartTime);
 
                 nodeAddedMsg.client(msg.client());
 
@@ -3798,8 +3824,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     IgniteSpiException sndErr = null;
                                     Integer res = null;
 
-                                    TcpDiscoveryJoinRequestMessage msg0 =
-                                        new TcpDiscoveryJoinRequestMessage(msg.node(), msg.discoveryData());
+                                    TcpDiscoveryJoinRequestMessage msg0 = new TcpDiscoveryJoinRequestMessage(msg.node(),
+                                        msg.discoveryData(),
+                                        msg.discoveryDataBytes());
 
                                     try {
                                         res = trySendMessageDirectly(crd, msg0);
@@ -4190,10 +4217,30 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
+                    try {
+                        if (data == null && msg.newNodeDiscoveryDataBytes() != null)
+                            data = U.unmarshalZip(spi.marshaller(), msg.newNodeDiscoveryDataBytes(), U.resolveClassLoader(spi.ignite().configuration()));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to unmarshall discovery data", e);
+                    }
+
                     if (data != null)
-                        spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()));
+                        spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()), false);
 
-                    msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
+                    Map<Integer, byte[]> discoData = spi.collectExchangeData(node.id());
+
+                    if (TcpDiscoverySpi.COMPRESS_MSGS) {
+                        try {
+                            for (Map.Entry<Integer, byte[]> e : discoData.entrySet())
+                                e.setValue(U.zip(e.getValue()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to compress message data", e);
+                        }
+                    }
+
+                    msg.addDiscoveryData(locNodeId, discoData);
 
                     processMessageFailedNodes(msg);
                 }
@@ -4317,9 +4364,15 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 // Notify outside of synchronized block.
                 if (dataMap != null) {
-                    for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
-                        spi.onExchange(node.id(), entry.getKey(), entry.getValue(),
-                            U.resolveClassLoader(spi.ignite().configuration()));
+                    ClassLoader ldr = U.resolveClassLoader(spi.ignite().configuration());
+
+                    for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) {
+                        spi.onExchange(node.id(),
+                            entry.getKey(),
+                            entry.getValue(),
+                            ldr,
+                            TcpDiscoverySpi.COMPRESS_MSGS);
+                    }
                 }
 
                 processMessageFailedNodes(msg);
@@ -5258,7 +5311,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     DiscoverySpiCustomMessage msgObj = null;
 
                     try {
-                        msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
+                        msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()), TcpDiscoverySpi.COMPRESS_MSGS);
                     }
                     catch (Throwable e) {
                         U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -5269,8 +5322,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (nextMsg != null) {
                             try {
+                                byte[] bytes = U.marshal(spi.marshaller(), nextMsg);
+
+                                if (TcpDiscoverySpi.COMPRESS_MSGS)
+                                    bytes = U.zip(bytes);
+
                                 TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
-                                    getLocalNodeId(), nextMsg, U.marshal(spi.marshaller(), nextMsg));
+                                    getLocalNodeId(), nextMsg, bytes);
 
                                 ackMsg.topologyVersion(msg.topologyVersion());
 
@@ -5420,7 +5478,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (node != null) {
                     try {
                         DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
-                            U.resolveClassLoader(spi.ignite().configuration()));
+                            U.resolveClassLoader(spi.ignite().configuration()), TcpDiscoverySpi.COMPRESS_MSGS);
 
                         lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
                             msg.topologyVersion(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index a8704e7..e5c52d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -99,6 +99,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessa
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+
 /**
  * Discovery SPI implementation that uses TCP/IP for node discovery.
  * <p>
@@ -218,6 +220,9 @@ import org.jetbrains.annotations.Nullable;
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
 public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+    /** */
+    public static final boolean COMPRESS_MSGS = getBoolean("COMPRESS_DISCO_MSGS", false);
+
     /** Failure detection timeout feature major version. */
     final static byte FAILURE_DETECTION_MAJOR_VER = 1;
 
@@ -1704,7 +1709,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     protected void onExchange(UUID joiningNodeID,
         UUID nodeId,
         Map<Integer, byte[]> data,
-        ClassLoader clsLdr)
+        ClassLoader clsLdr,
+        boolean zip)
     {
         if (locNode.isDaemon())
             return;
@@ -1713,7 +1719,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
             try {
-                Serializable compData = U.unmarshal(marshaller(), entry.getValue(), clsLdr);
+                Serializable compData = zip ?
+                    (Serializable)U.unmarshalZip(marshaller(), entry.getValue(), clsLdr) :
+                    (Serializable)U.unmarshal(marshaller(), entry.getValue(), clsLdr);
 
                 data0.put(entry.getKey(), compData);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 219c0ad..a2ebdfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -76,9 +76,11 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
      * @return Deserialized message,
      * @throws java.lang.Throwable if unmarshal failed.
      */
-    @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
+    @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr, boolean zip) throws Throwable {
         if (msg == null) {
-            msg = U.unmarshal(marsh, msgBytes, ldr);
+            msg = zip ?
+                (DiscoverySpiCustomMessage)U.unmarshalZip(marsh, msgBytes, ldr) :
+                (DiscoverySpiCustomMessage)U.unmarshal(marsh, msgBytes, ldr);
 
             assert msg != null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 4422919..a41e064 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -41,17 +41,23 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
     @GridToStringExclude
     private transient boolean directSndFailed;
 
+    /** */
+    private final byte[] discoDataBytes;
+
     /**
      * Constructor.
      *
      * @param node New node that wants to join.
      * @param discoData Discovery data.
      */
-    public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, Map<Integer, byte[]> discoData) {
+    public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node,
+        Map<Integer, byte[]> discoData,
+        byte[] discoDataBytes) {
         super(node.id());
 
         this.node = node;
         this.discoData = discoData;
+        this.discoDataBytes = discoDataBytes;
     }
 
     /**
@@ -63,6 +69,10 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
         return node;
     }
 
+    public byte[] discoveryDataBytes() {
+        return discoDataBytes;
+    }
+
     /**
      * @return Discovery data.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index bd52c04..7163d58 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -67,6 +67,9 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     /** Discovery data from new node. */
     private Map<Integer, byte[]> newNodeDiscoData;
 
+    /** */
+    private byte[] newNodeDiscoDataBytes;
+
     /** Discovery data from old nodes. */
     private Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData;
 
@@ -84,6 +87,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId,
         TcpDiscoveryNode node,
         Map<Integer, byte[]> newNodeDiscoData,
+        byte[] newNodeDiscoDataBytes,
         long gridStartTime)
     {
         super(creatorNodeId);
@@ -93,6 +97,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
 
         this.node = node;
         this.newNodeDiscoData = newNodeDiscoData;
+        this.newNodeDiscoDataBytes = newNodeDiscoDataBytes;
         this.gridStartTime = gridStartTime;
 
         oldNodesDiscoData = new LinkedHashMap<>();
@@ -112,6 +117,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
         this.clientTop = msg.clientTop;
         this.topHist = msg.topHist;
         this.newNodeDiscoData = msg.newNodeDiscoData;
+        this.newNodeDiscoDataBytes = msg.newNodeDiscoDataBytes;
         this.oldNodesDiscoData = msg.oldNodesDiscoData;
         this.gridStartTime = msg.gridStartTime;
     }
@@ -228,6 +234,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
         return newNodeDiscoData;
     }
 
+    public byte[] newNodeDiscoveryDataBytes() {
+        return newNodeDiscoDataBytes;
+    }
+
     /**
      * @return Discovery data from old nodes.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 043208c..38fbde7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -2189,7 +2189,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 if (msg instanceof TcpDiscoveryCustomEventMessage) {
                     try {
                         DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(
-                            ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()), "delegate");
+                            ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader(), false), "delegate");
 
                         if (custMsg instanceof StartRoutineAckDiscoveryMessage) {
                             log.info("Skip message send and stop node: " + msg);