You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/24 19:23:16 UTC

incubator-ignite git commit: IGNITE-709 Fix CacheListenerTest#testDeregistration()

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-709_2 0ae75d8ca -> 8fbb590f2


IGNITE-709 Fix CacheListenerTest#testDeregistration()


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8fbb590f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8fbb590f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8fbb590f

Branch: refs/heads/ignite-709_2
Commit: 8fbb590f2e659c785c6d89ea49e1e4bb3fdc666f
Parents: 0ae75d8
Author: sevdokimov <se...@jetbrains.com>
Authored: Sun May 24 20:23:01 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Sun May 24 20:23:01 2015 +0300

----------------------------------------------------------------------
 .../discovery/CustomMessageWrapper.java         |  5 ++
 .../discovery/DiscoveryCustomMessage.java       |  5 ++
 .../cache/DynamicCacheChangeBatch.java          |  5 ++
 .../continuous/AbstractContinuousMessage.java   | 54 ++++++++++++++++++++
 .../StartRoutineAckDiscoveryMessage.java        | 20 ++------
 .../StartRoutineDiscoveryMessage.java           | 25 +++------
 .../StopRoutineAckDiscoveryMessage.java         | 19 +------
 .../continuous/StopRoutineDiscoveryMessage.java | 19 +------
 .../discovery/DiscoverySpiCustomMessage.java    |  5 ++
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  4 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 14 ++---
 .../TcpDiscoveryCustomEventMessage.java         | 31 +++++++++--
 12 files changed, 126 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 0afb6cf..23f8bda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -44,6 +44,11 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage {
         return res == null ? null : new CustomMessageWrapper(res);
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return delegate.isMutable();
+    }
+
     /**
      * @return Delegate.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index 13c0b9c..693bbef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -40,4 +40,9 @@ public interface DiscoveryCustomMessage extends Serializable {
      * @return Ack message or {@code null} if ack is not required.
      */
     @Nullable public DiscoveryCustomMessage ackMessage();
+
+    /**
+     * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+     */
+    public boolean isMutable();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index ca257a9..5fcd0e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -83,4 +83,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     @Nullable @Override public DiscoveryCustomMessage ackMessage() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
new file mode 100644
index 0000000..f375777
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage {
+    /** Routine ID. */
+    protected final UUID routineId;
+
+    /**
+     * @param id Id.
+     */
+    protected AbstractContinuousMessage(UUID id) {
+        routineId = id;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    public UUID routineId() {
+        return routineId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean incrementMinorTopologyVersion() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 66892b1..3e3e6fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -26,13 +26,10 @@ import java.util.*;
 /**
  *
  */
-public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Routine ID. */
-    private final UUID routineId;
-
     /** */
     private final Map<UUID, IgniteCheckedException> errs;
 
@@ -41,13 +38,9 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
      * @param errs Errs.
      */
     public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
-        this.routineId = routineId;
-        this.errs = new HashMap<>(errs);
-    }
+        super(routineId);
 
-    /** {@inheritDoc} */
-    @Override public boolean incrementMinorTopologyVersion() {
-        return false;
+        this.errs = new HashMap<>(errs);
     }
 
     /** {@inheritDoc} */
@@ -56,13 +49,6 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
     }
 
     /**
-     * @return Routine ID.
-     */
-    public UUID routineId() {
-        return routineId;
-    }
-
-    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 2199fd0..ec0d36b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -26,13 +26,10 @@ import java.util.*;
 /**
  *
  */
-public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
+public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Routine ID. */
-    private final UUID routineId;
-
     /** */
     private final StartRequestData startReqData;
 
@@ -44,13 +41,9 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
      * @param startReqData Start request data.
      */
     public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
-        this.routineId = routineId;
-        this.startReqData = startReqData;
-    }
+        super(routineId);
 
-    /** {@inheritDoc} */
-    @Override public boolean incrementMinorTopologyVersion() {
-        return false;
+        this.startReqData = startReqData;
     }
 
     /**
@@ -69,13 +62,6 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
     }
 
     /**
-     * @return Routine ID.
-     */
-    public UUID routineId() {
-        return routineId;
-    }
-
-    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {
@@ -83,6 +69,11 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
         return new StartRoutineAckDiscoveryMessage(routineId, errs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
index a640222..350f13c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -25,34 +25,19 @@ import java.util.*;
 /**
  *
  */
-public class StopRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Routine ID. */
-    private final UUID routineId;
-
     /**
      * @param routineId Routine id.
      */
     public StopRoutineAckDiscoveryMessage(UUID routineId) {
-        this.routineId = routineId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean incrementMinorTopologyVersion() {
-        return false;
+        super(routineId);
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryCustomMessage ackMessage() {
         return null;
     }
-
-    /**
-     * @return Routine ID.
-     */
-    public UUID routineId() {
-        return routineId;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
index e8a43a3..5b0dc5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
@@ -25,30 +25,15 @@ import java.util.*;
 /**
  *
  */
-public class StopRoutineDiscoveryMessage implements DiscoveryCustomMessage {
+public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Routine ID. */
-    private final UUID routineId;
-
     /**
      * @param routineId Routine id.
      */
     public StopRoutineDiscoveryMessage(UUID routineId) {
-        this.routineId = routineId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean incrementMinorTopologyVersion() {
-        return false;
-    }
-
-    /**
-     * @return Routine ID.
-     */
-    public UUID routineId() {
-        return routineId;
+        super(routineId);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 72ba9db..15e943b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -32,4 +32,9 @@ public interface DiscoverySpiCustomMessage extends Serializable {
      * Called when message passed the ring.
      */
     @Nullable public DiscoverySpiCustomMessage ackMessage();
+
+    /**
+     * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+     */
+    public boolean isMutable();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 46e9635..22bb49b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -461,7 +461,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
             throw new IgniteException("Failed to send custom message: client is disconnected");
 
         try {
-            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt)));
+            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt)));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -1481,7 +1481,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
 
                     if (node != null && node.visible()) {
                         try {
-                            DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+                            DiscoverySpiCustomMessage msgObj = msg.message(marsh);
 
                             notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 0164e5c..34e1ca8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1266,7 +1266,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
         try {
-            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt)));
+            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt)));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -4536,7 +4536,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                     DiscoverySpiCustomMessage msgObj = null;
 
                     try {
-                        msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+                        msgObj = msg.message(marsh);
                     }
                     catch (Throwable e) {
                         U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -4547,7 +4547,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
 
                         if (nextMsg != null) {
                             try {
-                                addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg)));
+                                addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
+                                    marsh.marshal(nextMsg)));
                             }
                             catch (IgniteCheckedException e) {
                                 U.error(log, "Failed to marshal discovery custom message.", e);
@@ -4584,13 +4585,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
             Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
 
             if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
-                assert msg.messageBytes() != null;
-
                 TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
 
                 if (node != null) {
                     try {
-                        DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+                        DiscoverySpiCustomMessage msgObj = msg.message(marsh);
 
                         lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
                             msg.topologyVersion(),
@@ -4599,7 +4598,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                             hist,
                             msgObj);
 
-                        msg.messageBytes(marsh.marshal(msgObj));
+                        if (msgObj.isMutable())
+                            msg.message(msgObj, marsh.marshal(msgObj));
                     }
                     catch (Throwable e) {
                         U.error(log, "Failed to unmarshal discovery custom message.", e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 372aa18..0739c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.spi.discovery.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -31,15 +34,21 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
     private static final long serialVersionUID = 0L;
 
     /** */
+    private transient volatile DiscoverySpiCustomMessage msg;
+
+    /** */
     private byte[] msgBytes;
 
     /**
      * @param creatorNodeId Creator node id.
+     * @param msg Message.
      * @param msgBytes Serialized message.
      */
-    public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, byte[] msgBytes) {
+    public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
+        @NotNull byte[] msgBytes) {
         super(creatorNodeId);
 
+        this.msg = msg;
         this.msgBytes = msgBytes;
     }
 
@@ -51,12 +60,28 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
     }
 
     /**
-     * @param msgBytes New message bytes.
+     * @param msg Message.
+     * @param msgBytes Serialized message.
      */
-    public void messageBytes(byte[] msgBytes) {
+    public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) {
+        this.msg = msg;
         this.msgBytes = msgBytes;
     }
 
+    /**
+     * @return Deserialized message,
+     * @throws java.lang.Throwable if unmarshal failed.
+     */
+    @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable {
+        if (msg == null) {
+            msg = marsh.unmarshal(msgBytes, U.gridClassLoader());
+
+            assert msg != null;
+        }
+
+        return msg;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());