You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:38 UTC

[46/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 0000000,6996e6f..b918b68
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -1,0 -1,2043 +1,2153 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.managers.communication;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
++import org.apache.ignite.plugin.extensions.communication.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.spi.communication.*;
+ import org.apache.ignite.internal.managers.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.Map.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.*;
+ import static org.jdk8.backport.ConcurrentLinkedHashMap.QueuePolicy.*;
+ 
+ /**
+  * Grid communication manager.
+  */
+ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
+     /** Max closed topics to store. */
+     public static final int MAX_CLOSED_TOPICS = 10240;
+ 
+     /** Listeners by topic. */
+     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
+ 
+     /** Disconnect listeners. */
+     private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
+ 
+     /** Public pool. */
+     private ExecutorService pubPool;
+ 
+     /** Internal P2P pool. */
+     private ExecutorService p2pPool;
+ 
+     /** Internal system pool. */
+     private ExecutorService sysPool;
+ 
+     /** Internal management pool. */
+     private ExecutorService mgmtPool;
+ 
+     /** Affinity assignment executor service. */
+     private ExecutorService affPool;
+ 
+     /** Utility cache pool. */
+     private ExecutorService utilityCachePool;
+ 
+     /** Discovery listener. */
+     private GridLocalEventListener discoLsnr;
+ 
+     /** */
+     private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap =
+         new ConcurrentHashMap8<>();
+ 
+     /** Local node ID. */
+     private final UUID locNodeId;
+ 
+     /** Discovery delay. */
+     private final long discoDelay;
+ 
+     /** Cache for messages that were received prior to discovery. */
+     private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap =
+         new ConcurrentHashMap8<>();
+ 
+     /** Communication message listener. */
+     private CommunicationListener<Serializable> commLsnr;
+ 
+     /** Grid marshaller. */
+     private final IgniteMarshaller marsh;
+ 
+     /** Busy lock. */
+     private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+ 
+     /** Lock to sync maps access. */
+     private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ 
+     /** Message cache. */
+     private ThreadLocal<IgniteBiTuple<Object, byte[]>> cacheMsg =
+         new GridThreadLocal<IgniteBiTuple<Object, byte[]>>() {
+             @Nullable @Override protected IgniteBiTuple<Object, byte[]> initialValue() {
+                 return null;
+             }
+         };
+ 
+     /** Fully started flag. When set to true, can send and receive messages. */
+     private volatile boolean started;
+ 
+     /** Closed topics. */
+     private final GridBoundedConcurrentLinkedHashSet<Object> closedTopics =
+         new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS, MAX_CLOSED_TOPICS, 0.75f, 256,
+             PER_SEGMENT_Q_OPTIMIZED_RMV);
+ 
+     /** Workers count. */
+     private final LongAdder workersCnt = new LongAdder();
+ 
++    /** */
++    private int pluginMsg = GridTcpCommunicationMessageFactory.MAX_COMMON_TYPE;
++
++    /** */
++    private Map<Byte, GridTcpCommunicationMessageProducer> pluginMsgs;
++
++    /** */
++    private MessageFactory msgFactory;
++
++    /** */
++    private MessageWriterFactory writerFactory;
++
++    /** */
++    private MessageReaderFactory readerFactory;
++
+     /**
+      * @param ctx Grid kernal context.
+      */
+     @SuppressWarnings("deprecation")
+     public GridIoManager(GridKernalContext ctx) {
+         super(ctx, ctx.config().getCommunicationSpi());
+ 
+         locNodeId = ctx.localNodeId();
+ 
+         discoDelay = ctx.config().getDiscoveryStartupDelay();
+ 
+         marsh = ctx.config().getMarshaller();
+     }
+ 
+     /**
++     * @param producer Message producer.
++     * @return Message type code.
++     */
++    public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer) {
++        int nextMsg = ++pluginMsg;
++
++        if (nextMsg > Byte.MAX_VALUE)
++            throw new IgniteException();
++
++        if (pluginMsgs == null)
++            pluginMsgs = new HashMap<>();
++
++        pluginMsgs.put((byte)nextMsg, producer);
++
++        return (byte)nextMsg;
++    }
++
++    /**
++     * Initializes manager (called prior to discovery start, but after all other components).
++     */
++    public void initMessageFactory() {
++        final GridTcpCommunicationMessageProducer[] common = GridTcpCommunicationMessageFactory.commonProducers();
++
++        final GridTcpCommunicationMessageProducer[] producers;
++
++        if (pluginMsgs != null) {
++            producers = Arrays.copyOf(common, pluginMsg + 1);
++
++            for (Map.Entry<Byte, GridTcpCommunicationMessageProducer> e : pluginMsgs.entrySet()) {
++                assert producers[e.getKey()] == null : e.getKey();
++
++                producers[e.getKey()] = e.getValue();
++            }
++
++            pluginMsgs = null;
++        }
++        else
++            producers = common;
++
++        msgFactory = new MessageFactory() {
++            @Override public GridTcpCommunicationMessageAdapter create(byte type) {
++                GridTcpCommunicationMessageAdapter msg;
++
++                if (type < 0 || type >= producers.length)
++                    msg = GridTcpCommunicationMessageFactory.create(type);
++                else {
++                    GridTcpCommunicationMessageProducer producer = producers[type];
++
++                    if (producer == null)
++                        throw new IllegalStateException("Common message type producer is not registered: " + type);
++
++                    msg = producer.create(type);
++                }
++
++                msg.setReader(readerFactory.reader());
++
++                return msg;
++            }
++        };
++    }
++
++    public MessageFactory messageFactory() {
++        assert msgFactory != null;
++
++        return msgFactory;
++    }
++
++    /**
+      * Resets metrics for this manager.
+      */
+     public void resetMetrics() {
+         getSpi().resetMetrics();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("deprecation")
+     @Override public void start() throws IgniteCheckedException {
+         assertParameter(discoDelay > 0, "discoveryStartupDelay > 0");
+ 
+         startSpi();
+ 
+         pubPool = ctx.config().getExecutorService();
+         p2pPool = ctx.config().getPeerClassLoadingExecutorService();
+         sysPool = ctx.config().getSystemExecutorService();
+         mgmtPool = ctx.config().getManagementExecutorService();
+         utilityCachePool = ctx.utilityCachePool();
+         affPool = Executors.newFixedThreadPool(1);
+ 
+         getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
+             @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
+                 try {
+                     onMessage0(nodeId, (GridIoMessage)msg, msgC);
+                 }
+                 catch (ClassCastException ignored) {
+                     U.error(log, "Communication manager received message of unknown type (will ignore): " +
+                         msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+                         "which is illegal - make sure to send messages only via GridProjection API.");
+                 }
+             }
+ 
+             @Override public void onDisconnected(UUID nodeId) {
+                 for (GridDisconnectListener lsnr : disconnectLsnrs)
+                     lsnr.onNodeDisconnected(nodeId);
+             }
+         });
+ 
++        MessageWriterFactory[] writerExt = ctx.plugins().extensions(MessageWriterFactory.class);
++
++        if (writerExt != null && writerExt.length > 0)
++            writerFactory = writerExt[0];
++        else {
++            writerFactory = new MessageWriterFactory() {
++                @Override public MessageWriter writer() {
++                    return new GridTcpCommunicationMessageWriter();
++                }
++            };
++        }
++
++        MessageReaderFactory[] readerExt = ctx.plugins().extensions(MessageReaderFactory.class);
++
++        if (readerExt != null && readerExt.length > 0)
++            readerFactory = readerExt[0];
++        else {
++            readerFactory = new MessageReaderFactory() {
++                @Override public MessageReader reader() {
++                    return new GridTcpCommunicationMessageReader(msgFactory);
++                }
++            };
++        }
++
+         if (log.isDebugEnabled())
+             log.debug(startInfo());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
+     @Override public void onKernalStart0() throws IgniteCheckedException {
+         discoLsnr = new GridLocalEventListener() {
+             @SuppressWarnings({"TooBroadScope", "fallthrough"})
+             @Override public void onEvent(IgniteEvent evt) {
+                 assert evt instanceof IgniteDiscoveryEvent : "Invalid event: " + evt;
+ 
+                 IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+ 
+                 UUID nodeId = discoEvt.eventNode().id();
+ 
+                 switch (evt.type()) {
+                     case EVT_NODE_JOINED:
+                         assert waitMap.get(nodeId) == null; // We can't receive messages from undiscovered nodes.
+ 
+                         break;
+ 
+                     case EVT_NODE_LEFT:
+                     case EVT_NODE_FAILED:
+                         for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e :
+                             msgSetMap.entrySet()) {
+                             ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
+ 
+                             GridCommunicationMessageSet set;
+ 
+                             boolean empty;
+ 
+                             synchronized (map) {
+                                 set = map.remove(nodeId);
+ 
+                                 empty = map.isEmpty();
+                             }
+ 
+                             if (set != null) {
+                                 if (log.isDebugEnabled())
+                                     log.debug("Removed message set due to node leaving grid: " + set);
+ 
+                                 // Unregister timeout listener.
+                                 ctx.timeout().removeTimeoutObject(set);
+ 
+                                 // Node may still send stale messages for this topic
+                                 // even after discovery notification is done.
+                                 closedTopics.add(set.topic());
+                             }
+ 
+                             if (empty)
+                                 msgSetMap.remove(e.getKey(), map);
+                         }
+ 
+                         // Clean up delayed and ordered messages (need exclusive lock).
+                         lock.writeLock().lock();
+ 
+                         try {
+                             ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId);
+ 
+                             if (log.isDebugEnabled())
+                                 log.debug("Removed messages from discovery startup delay list " +
+                                     "(sender node left topology): " + waitList);
+                         }
+                         finally {
+                             lock.writeLock().unlock();
+                         }
+ 
+                         break;
+ 
+                     default:
+                         assert false : "Unexpected event: " + evt;
+                 }
+             }
+         };
+ 
+         ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ 
+         // Make sure that there are no stale messages due to window between communication
+         // manager start and kernal start.
+         // 1. Process wait list.
+         Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>();
+ 
+         lock.writeLock().lock();
+ 
+         try {
+             started = true;
+ 
+             for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet()) {
+                 if (ctx.discovery().node(e.getKey()) != null) {
+                     ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey());
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Processing messages from discovery startup delay list: " + waitList);
+ 
+                     if (waitList != null)
+                         delayedMsgs.add(waitList);
+                 }
+             }
+         }
+         finally {
+             lock.writeLock().unlock();
+         }
+ 
+         // After write lock released.
+         if (!delayedMsgs.isEmpty()) {
+             for (Collection<DelayedMessage> col : delayedMsgs)
+                 for (DelayedMessage msg : col)
+                     commLsnr.onMessage(msg.nodeId(), msg.message(), msg.callback());
+         }
+ 
+         // 2. Process messages sets.
+         for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e : msgSetMap.entrySet()) {
+             ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
+ 
+             for (GridCommunicationMessageSet set : map.values()) {
+                 if (ctx.discovery().node(set.nodeId()) == null) {
+                     // All map modifications should be synced for consistency.
+                     boolean rmv;
+ 
+                     synchronized (map) {
+                         rmv = map.remove(set.nodeId(), set);
+                     }
+ 
+                     if (rmv) {
+                         if (log.isDebugEnabled())
+                             log.debug("Removed message set due to node leaving grid: " + set);
+ 
+                         // Unregister timeout listener.
+                         ctx.timeout().removeTimeoutObject(set);
+                     }
+ 
+                 }
+             }
+ 
+             boolean rmv;
+ 
+             synchronized (map) {
+                 rmv = map.isEmpty();
+             }
+ 
+             if (rmv) {
+                 msgSetMap.remove(e.getKey(), map);
+ 
+                 // Node may still send stale messages for this topic
+                 // even after discovery notification is done.
+                 closedTopics.add(e.getKey());
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("BusyWait")
+     @Override public void onKernalStop0(boolean cancel) {
+         // No more communication messages.
+         getSpi().setListener(null);
+ 
+         busyLock.writeLock();
+ 
+         U.shutdownNow(getClass(), affPool, log);
+ 
+         boolean interrupted = false;
+ 
+         while (workersCnt.sum() != 0) {
+             try {
+                 Thread.sleep(200);
+             }
+             catch (InterruptedException ignored) {
+                 interrupted = true;
+             }
+         }
+ 
+         if (interrupted)
+             Thread.currentThread().interrupt();
+ 
+         GridEventStorageManager evtMgr = ctx.event();
+ 
+         if (evtMgr != null && discoLsnr != null)
+             evtMgr.removeLocalEventListener(discoLsnr);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void stop(boolean cancel) throws IgniteCheckedException {
+         stopSpi();
+ 
+         // Clear cache.
+         cacheMsg.set(null);
+ 
+         if (log.isDebugEnabled())
+             log.debug(stopInfo());
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param msg Message bytes.
+      * @param msgC Closure to call when message processing finished.
+      */
+     @SuppressWarnings("fallthrough")
+     private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
+         assert nodeId != null;
+         assert msg != null;
+ 
+         if (!busyLock.tryReadLock()) {
+             if (log.isDebugEnabled())
+                 log.debug("Received communication message while stopping grid.");
+ 
+             return;
+         }
+ 
+         try {
+             // Check discovery.
+             ClusterNode node = ctx.discovery().node(nodeId);
+ 
+             if (node == null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']');
+ 
+                 return; // We can't receive messages from non-discovered ones.
+             }
+ 
+             if (msg.topic() == null) {
+                 int topicOrd = msg.topicOrdinal();
+ 
+                 msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) : marsh.unmarshal(msg.topicBytes(), null));
+             }
+ 
+             if (!started) {
+                 lock.readLock().lock();
+ 
+                 try {
+                     if (!started) { // Sets to true in write lock, so double checking.
+                         // Received message before valid context is set to manager.
+                         if (log.isDebugEnabled())
+                             log.debug("Adding message to waiting list [senderId=" + nodeId +
+                                 ", msg=" + msg + ']');
+ 
+                         ConcurrentLinkedDeque8<DelayedMessage> list =
+                             F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque());
+ 
+                         assert list != null;
+ 
+                         list.add(new DelayedMessage(nodeId, msg, msgC));
+ 
+                         return;
+                     }
+                 }
+                 finally {
+                     lock.readLock().unlock();
+                 }
+             }
+ 
+             // If message is P2P, then process in P2P service.
+             // This is done to avoid extra waiting and potential deadlocks
+             // as thread pool may not have any available threads to give.
+             GridIoPolicy plc = msg.policy();
+ 
+             switch (plc) {
+                 case P2P_POOL: {
+                     processP2PMessage(nodeId, msg, msgC);
+ 
+                     break;
+                 }
+ 
+                 case PUBLIC_POOL:
+                 case SYSTEM_POOL:
+                 case MANAGEMENT_POOL:
+                 case AFFINITY_POOL:
+                 case UTILITY_CACHE_POOL: {
+                     if (msg.isOrdered())
+                         processOrderedMessage(nodeId, msg, plc, msgC);
+                     else
+                         processRegularMessage(nodeId, msg, plc, msgC);
+ 
+                     break;
+                 }
+             }
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to process message (will ignore): " + msg, e);
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Gets execution pool for policy.
+      *
+      * @param plc Policy.
+      * @return Execution pool.
+      */
+     private Executor pool(GridIoPolicy plc) {
+         switch (plc) {
+             case P2P_POOL:
+                 return p2pPool;
+             case SYSTEM_POOL:
+                 return sysPool;
+             case PUBLIC_POOL:
+                 return pubPool;
+             case MANAGEMENT_POOL:
+                 return mgmtPool;
+             case AFFINITY_POOL:
+                 return affPool;
+             case UTILITY_CACHE_POOL:
+                 assert utilityCachePool != null : "Utility cache pool is not configured.";
+ 
+                 return utilityCachePool;
+ 
+             default: {
+                 assert false : "Invalid communication policy: " + plc;
+ 
+                 // Never reached.
+                 return null;
+             }
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param msg Message.
+      * @param msgC Closure to call when message processing finished.
+      */
+     private void processP2PMessage(
+         final UUID nodeId,
+         final GridIoMessage msg,
+         final IgniteRunnable msgC
+     ) {
+         workersCnt.increment();
+ 
+         Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+             @Override protected void body() {
+                 try {
+                     threadProcessingMessage(true);
+ 
+                     GridMessageListener lsnr = lsnrMap.get(msg.topic());
+ 
+                     if (lsnr == null)
+                         return;
+ 
+                     Object obj = msg.message();
+ 
+                     assert obj != null;
+ 
+                     lsnr.onMessage(nodeId, obj);
+                 }
+                 finally {
+                     threadProcessingMessage(false);
+ 
+                     workersCnt.decrement();
+ 
+                     msgC.run();
+                 }
+             }
+         };
+ 
+         try {
+             p2pPool.execute(c);
+         }
+         catch (RejectedExecutionException e) {
+             U.error(log, "Failed to process P2P message due to execution rejection. Increase the upper bound " +
+                 "on 'ExecutorService' provided by 'GridConfiguration.getPeerClassLoadingExecutorService()'. " +
+                 "Will attempt to process message in the listener thread instead.", e);
+ 
+             c.run();
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param msg Message.
+      * @param plc Execution policy.
+      * @param msgC Closure to call when message processing finished.
+      */
+     private void processRegularMessage(
+         final UUID nodeId,
+         final GridIoMessage msg,
+         GridIoPolicy plc,
+         final IgniteRunnable msgC
+     ) {
+         workersCnt.increment();
+ 
+         Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+             @Override protected void body() {
+                 try {
+                     threadProcessingMessage(true);
+ 
+                     processRegularMessage0(msg, nodeId);
+                 }
+                 finally {
+                     threadProcessingMessage(false);
+ 
+                     workersCnt.decrement();
+ 
+                     msgC.run();
+                 }
+             }
+         };
+ 
+         try {
+             pool(plc).execute(c);
+         }
+         catch (RejectedExecutionException e) {
+             U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " +
+                 "on 'ExecutorService' provided by 'GridConfiguration.getExecutorService()'. " +
+                 "Will attempt to process message in the listener thread instead.", e);
+ 
+             c.run();
+         }
+     }
+ 
+     /**
+      * @param msg Message.
+      * @param nodeId Node ID.
+      */
+     @SuppressWarnings("deprecation")
+     private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {
+         GridMessageListener lsnr = lsnrMap.get(msg.topic());
+ 
+         if (lsnr == null)
+             return;
+ 
+         Object obj = msg.message();
+ 
+         assert obj != null;
+ 
+         lsnr.onMessage(nodeId, obj);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param msg Ordered message.
+      * @param plc Execution policy.
+      * @param msgC Closure to call when message processing finished ({@code null} for sync processing).
+      */
+     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+     private void processOrderedMessage(
+         final UUID nodeId,
+         final GridIoMessage msg,
+         final GridIoPolicy plc,
+         @Nullable final IgniteRunnable msgC
+     ) {
+         assert msg != null;
+ 
+         long timeout = msg.timeout();
+         boolean skipOnTimeout = msg.skipOnTimeout();
+ 
+         boolean isNew = false;
+ 
+         ConcurrentMap<UUID, GridCommunicationMessageSet> map;
+ 
+         GridCommunicationMessageSet set = null;
+ 
+         while (true) {
+             map = msgSetMap.get(msg.topic());
+ 
+             if (map == null) {
+                 set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg);
+ 
+                 map = new ConcurrentHashMap0<>();
+ 
+                 map.put(nodeId, set);
+ 
+                 ConcurrentMap<UUID, GridCommunicationMessageSet> old = msgSetMap.putIfAbsent(
+                     msg.topic(), map);
+ 
+                 if (old != null)
+                     map = old;
+                 else {
+                     isNew = true;
+ 
+                     // Put succeeded.
+                     break;
+                 }
+             }
+ 
+             boolean rmv = false;
+ 
+             synchronized (map) {
+                 if (map.isEmpty())
+                     rmv = true;
+                 else {
+                     set = map.get(nodeId);
+ 
+                     if (set == null) {
+                         GridCommunicationMessageSet old = map.putIfAbsent(nodeId,
+                             set = new GridCommunicationMessageSet(plc, msg.topic(),
+                                 nodeId, timeout, skipOnTimeout, msg));
+ 
+                         assert old == null;
+ 
+                         isNew = true;
+ 
+                         // Put succeeded.
+                         break;
+                     }
+                 }
+             }
+ 
+             if (rmv)
+                 msgSetMap.remove(msg.topic(), map);
+             else {
+                 assert set != null;
+                 assert !isNew;
+ 
+                 set.add(msg);
+ 
+                 break;
+             }
+         }
+ 
+         if (isNew && ctx.discovery().node(nodeId) == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Message is ignored as sender has left the grid: " + msg);
+ 
+             assert map != null;
+ 
+             boolean rmv;
+ 
+             synchronized (map) {
+                 map.remove(nodeId);
+ 
+                 rmv = map.isEmpty();
+             }
+ 
+             if (rmv)
+                 msgSetMap.remove(msg.topic(), map);
+ 
+             return;
+         }
+ 
+         if (isNew && set.endTime() != Long.MAX_VALUE)
+             ctx.timeout().addTimeoutObject(set);
+ 
+         if (set.reserved()) {
+             // Set is reserved which means that it is currently processed by worker thread.
+             if (msgC != null)
+                 msgC.run();
+ 
+             return;
+         }
+ 
+         final GridMessageListener lsnr = lsnrMap.get(msg.topic());
+ 
+         if (lsnr == null) {
+             if (closedTopics.contains(msg.topic())) {
+                 if (log.isDebugEnabled())
+                     log.debug("Message is ignored as it came for the closed topic: " + msg);
+ 
+                 assert map != null;
+ 
+                 msgSetMap.remove(msg.topic(), map);
+             }
+             else if (log.isDebugEnabled()) {
+                 // Note that we simply keep messages if listener is not
+                 // registered yet, until one will be registered.
+                 log.debug("Received message for unknown listener (messages will be kept until a " +
+                     "listener is registered): " + msg);
+             }
+ 
+             // Mark the message as processed.
+             if (msgC != null)
+                 msgC.run();
+ 
+             return;
+         }
+ 
+         if (msgC == null) {
+             // Message from local node can be processed in sync manner.
+             assert locNodeId.equals(nodeId);
+ 
+             unwindMessageSet(set, lsnr);
+ 
+             return;
+         }
+ 
+         // Set is not reserved and new worker should be submitted.
+         workersCnt.increment();
+ 
+         final GridCommunicationMessageSet msgSet0 = set;
+ 
+         Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+             @Override protected void body() {
+                 try {
+                     threadProcessingMessage(true);
+ 
+                     unwindMessageSet(msgSet0, lsnr);
+                 }
+                 finally {
+                     threadProcessingMessage(false);
+ 
+                     workersCnt.decrement();
+ 
+                     msgC.run();
+                 }
+             }
+         };
+ 
+         try {
+             pool(plc).execute(c);
+         }
+         catch (RejectedExecutionException e) {
+             U.error(log, "Failed to process ordered message due to execution rejection. " +
+                 "Increase the upper bound on executor service provided by corresponding " +
+                 "configuration property. Will attempt to process message in the listener " +
+                 "thread instead [msgPlc=" + plc + ']', e);
+ 
+             c.run();
+         }
+     }
+ 
+     /**
+      * @param msgSet Message set to unwind.
+      * @param lsnr Listener to notify.
+      */
+     private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageListener lsnr) {
+         // Loop until message set is empty or
+         // another thread owns the reservation.
+         while (true) {
+             if (msgSet.reserve()) {
+                 try {
+                     msgSet.unwind(lsnr);
+                 }
+                 finally {
+                     msgSet.release();
+                 }
+ 
+                 // Check outside of reservation block.
+                 if (!msgSet.changed()) {
+                     if (log.isDebugEnabled())
+                         log.debug("Message set has not been changed: " + msgSet);
+ 
+                     break;
+                 }
+             }
+             else {
+                 if (log.isDebugEnabled())
+                     log.debug("Another thread owns reservation: " + msgSet);
+ 
+                 return;
+             }
+         }
+     }
+ 
+     /**
+      * @param node Destination node.
+      * @param topic Topic to send the message to.
+      * @param topicOrd GridTopic enumeration ordinal.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @param ordered Ordered flag.
+      * @param timeout Timeout.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     private void send(
+         ClusterNode node,
+         Object topic,
+         int topicOrd,
+         GridTcpCommunicationMessageAdapter msg,
+         GridIoPolicy plc,
+         boolean ordered,
+         long timeout,
+         boolean skipOnTimeout
+     ) throws IgniteCheckedException {
+         assert node != null;
+         assert topic != null;
+         assert msg != null;
+         assert plc != null;
+ 
+         GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
+ 
+         if (locNodeId.equals(node.id())) {
+             assert plc != P2P_POOL;
+ 
+             CommunicationListener commLsnr = this.commLsnr;
+ 
+             if (commLsnr == null)
+                 throw new IgniteCheckedException("Trying to send message when grid is not fully started.");
+ 
+             if (ordered)
+                 processOrderedMessage(locNodeId, ioMsg, plc, null);
+             else
+                 processRegularMessage0(ioMsg, locNodeId);
+         }
+         else {
++            ioMsg.setWriter(writerFactory.writer());
++
+             if (topicOrd < 0)
+                 ioMsg.topicBytes(marsh.marshal(topic));
+ 
+             try {
+                 getSpi().sendMessage(node, ioMsg);
+             }
+             catch (IgniteSpiException e) {
+                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
+                     "TCP connection cannot be established due to firewall issues) " +
+                     "[node=" + node + ", topic=" + topic +
+                     ", msg=" + msg + ", policy=" + plc + ']', e);
+             }
+         }
+     }
+ 
+     /**
+      * @param nodeId Id of destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void send(UUID nodeId, Object topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+         throws IgniteCheckedException {
+         ClusterNode node = ctx.discovery().node(nodeId);
+ 
+         if (node == null)
+             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+ 
+         send(node, topic, msg, plc);
+     }
+ 
+     /**
+      * @param nodeId Id of destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     @SuppressWarnings("TypeMayBeWeakened")
+     public void send(UUID nodeId, GridTopic topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+         throws IgniteCheckedException {
+         ClusterNode node = ctx.discovery().node(nodeId);
+ 
+         if (node == null)
+             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+ 
+         send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+     }
+ 
+     /**
+      * @param node Destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void send(ClusterNode node, Object topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+         throws IgniteCheckedException {
+         send(node, topic, -1, msg, plc, false, 0, false);
+     }
+ 
+     /**
+      * @param node Destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void send(ClusterNode node, GridTopic topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+         throws IgniteCheckedException {
+         send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+     }
+ 
+     /**
+      * @param node Destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @param timeout Timeout to keep a message on receiving queue.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void sendOrderedMessage(
+         ClusterNode node,
+         Object topic,
+         GridTcpCommunicationMessageAdapter msg,
+         GridIoPolicy plc,
+         long timeout,
+         boolean skipOnTimeout
+     ) throws IgniteCheckedException {
+         assert timeout > 0 || skipOnTimeout;
+ 
+         send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+     }
+ 
+     /**
+      * @param nodeId Destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @param timeout Timeout to keep a message on receiving queue.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void sendOrderedMessage(
+         UUID nodeId,
+         Object topic,
+         GridTcpCommunicationMessageAdapter msg,
+         GridIoPolicy plc,
+         long timeout,
+         boolean skipOnTimeout
+     ) throws IgniteCheckedException {
+         assert timeout > 0 || skipOnTimeout;
+ 
+         ClusterNode node = ctx.discovery().node(nodeId);
+ 
+         if (node == null)
+             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+ 
+         send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+     }
+ 
+     /**
+      * @param nodes Destination nodes.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @param timeout Timeout to keep a message on receiving queue.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void sendOrderedMessage(
+         Collection<? extends ClusterNode> nodes,
+         Object topic,
+         GridTcpCommunicationMessageAdapter msg,
+         GridIoPolicy plc,
+         long timeout,
+         boolean skipOnTimeout
+     )
+         throws IgniteCheckedException {
+         assert timeout > 0 || skipOnTimeout;
+ 
+         send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
+     }
+ 
+     /**
+      * @param nodes Destination nodes.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void send(
+         Collection<? extends ClusterNode> nodes,
+         Object topic,
+         GridTcpCommunicationMessageAdapter msg,
+         GridIoPolicy plc
+     ) throws IgniteCheckedException {
+         send(nodes, topic, -1, msg, plc, false, 0, false);
+     }
+ 
+     /**
+      * @param nodes Destination nodes.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void send(
+         Collection<? extends ClusterNode> nodes,
+         GridTopic topic,
+         GridTcpCommunicationMessageAdapter msg,
+         GridIoPolicy plc
+     ) throws IgniteCheckedException {
+         send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+     }
+ 
+     /**
+      * Sends a peer deployable user message.
+      *
+      * @param nodes Destination nodes.
+      * @param msg Message to send.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
+         sendUserMessage(nodes, msg, null, false, 0);
+     }
+ 
+     /**
+      * Sends a peer deployable user message.
+      *
+      * @param nodes Destination nodes.
+      * @param msg Message to send.
+      * @param topic Message topic to use.
+      * @param ordered Is message ordered?
+      * @param timeout Message timeout in milliseconds for ordered messages.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     @SuppressWarnings("ConstantConditions")
+     public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
+         @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException {
+         boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
+ 
+         byte[] serMsg = null;
+         byte[] serTopic = null;
+ 
+         if (!loc) {
+             serMsg = marsh.marshal(msg);
+ 
+             if (topic != null)
+                 serTopic = marsh.marshal(topic);
+         }
+ 
+         GridDeployment dep = null;
+ 
+         String depClsName = null;
+ 
+         if (ctx.config().isPeerClassLoadingEnabled()) {
+             Class<?> cls0 = U.detectClass(msg);
+ 
+             if (U.isJdk(cls0) && topic != null)
+                 cls0 = U.detectClass(topic);
+ 
+             dep = ctx.deploy().deploy(cls0, U.detectClassLoader(cls0));
+ 
+             if (dep == null)
+                 throw new IgniteDeploymentException("Failed to deploy user message: " + msg);
+ 
+             depClsName = cls0.getName();
+         }
+ 
+         GridTcpCommunicationMessageAdapter ioMsg = new GridIoUserMessage(
+             msg,
+             serMsg,
+             depClsName,
+             topic,
+             serTopic,
+             dep != null ? dep.classLoaderId() : null,
+             dep != null ? dep.deployMode() : null,
+             dep != null ? dep.userVersion() : null,
+             dep != null ? dep.participants() : null);
+ 
+         if (ordered)
+             sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
+         else if (loc)
+             send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+         else {
+             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
+ 
+             Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
+ 
+             if (locNode != null)
+                 send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ 
+             if (!rmtNodes.isEmpty())
+                 send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+         }
+     }
+ 
+     /**
+      * @param topic Topic to subscribe to.
+      * @param p Message predicate.
+      */
+     public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
+         if (p != null) {
+             try {
+                 addMessageListener(TOPIC_COMM_USER,
+                     new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+             }
+             catch (IgniteCheckedException e) {
+                 throw new IgniteException(e);
+             }
+         }
+     }
+ 
+     /**
+      * @param topic Topic to unsubscribe from.
+      * @param p Message predicate.
+      */
+     public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) {
+         try {
+             removeMessageListener(TOPIC_COMM_USER,
+                 new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+         }
+         catch (IgniteCheckedException e) {
+             throw new IgniteException(e);
+         }
+     }
+ 
+     /**
+      * @param nodes Destination nodes.
+      * @param topic Topic to send the message to.
+      * @param topicOrd Topic ordinal value.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @param ordered Ordered flag.
+      * @param timeout Message timeout.
+      * @param skipOnTimeout Whether message can be skipped in timeout.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     private void send(
+         Collection<? extends ClusterNode> nodes,
+         Object topic,
+         int topicOrd,
+         GridTcpCommunicationMessageAdapter msg,
+         GridIoPolicy plc,
+         boolean ordered,
+         long timeout,
+         boolean skipOnTimeout
+     ) throws IgniteCheckedException {
+         assert nodes != null;
+         assert topic != null;
+         assert msg != null;
+         assert plc != null;
+ 
+         if (!ordered)
+             assert F.find(nodes, null, F.localNode(locNodeId)) == null :
+                 "Internal GridGain code should never call the method with local node in a node list.";
+ 
+         try {
+             // Small optimization, as communication SPIs may have lighter implementation for sending
+             // messages to one node vs. many.
+             if (!nodes.isEmpty()) {
+                 boolean first = true;
+ 
+                 for (ClusterNode node : nodes) {
+                     GridTcpCommunicationMessageAdapter msg0 = first ? msg : msg.clone();
+ 
+                     first = false;
+ 
+                     send(node, topic, topicOrd, msg0, plc, ordered, timeout, skipOnTimeout);
+                 }
+             }
+             else if (log.isDebugEnabled())
+                 log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
+                     msg + ", policy=" + plc + ']');
+         }
+         catch (IgniteSpiException e) {
+             throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or " +
+                 "TCP connection cannot be established due to firewall issues) " +
+                 "[nodes=" + nodes + ", topic=" + topic +
+                 ", msg=" + msg + ", policy=" + plc + ']', e);
+         }
+     }
+ 
+     /**
+      * @param topic Listener's topic.
+      * @param lsnr Listener to add.
+      */
+     @SuppressWarnings({"TypeMayBeWeakened", "deprecation"})
+     public void addMessageListener(GridTopic topic, GridMessageListener lsnr) {
+         addMessageListener((Object)topic, lsnr);
+     }
+ 
+     /**
+      * @param lsnr Listener to add.
+      */
+     public void addDisconnectListener(GridDisconnectListener lsnr) {
+         disconnectLsnrs.add(lsnr);
+     }
+ 
+     /**
+      * @param topic Listener's topic.
+      * @param lsnr Listener to add.
+      */
+     @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
+     public void addMessageListener(Object topic, final GridMessageListener lsnr) {
+         assert lsnr != null;
+         assert topic != null;
+ 
+         // Make sure that new topic is not in the list of closed topics.
+         closedTopics.remove(topic);
+ 
+         GridMessageListener lsnrs;
+ 
+         for (;;) {
+             lsnrs = lsnrMap.putIfAbsent(topic, lsnr);
+ 
+             if (lsnrs == null) {
+                 lsnrs = lsnr;
+ 
+                 break;
+             }
+ 
+             assert lsnrs != null;
+ 
+             if (!(lsnrs instanceof ArrayListener)) { // We are putting the second listener, creating array.
+                 GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr);
+ 
+                 if (lsnrMap.replace(topic, lsnrs, arrLsnr)) {
+                     lsnrs = arrLsnr;
+ 
+                     break;
+                 }
+             }
+             else {
+                 if (((ArrayListener)lsnrs).add(lsnr))
+                     break;
+ 
+                 // Add operation failed because array is already empty and is about to be removed, helping and retrying.
+                 lsnrMap.remove(topic, lsnrs);
+             }
+         }
+ 
+         Map<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic);
+ 
+         Collection<GridCommunicationMessageSet> msgSets = map != null ? map.values() : null;
+ 
+         if (msgSets != null) {
+             final GridMessageListener lsnrs0 = lsnrs;
+ 
+             boolean success = true;
+ 
+             try {
+                 for (final GridCommunicationMessageSet msgSet : msgSets) {
+                     success = false;
+ 
+                     workersCnt.increment();
+ 
+                     pool(msgSet.policy()).execute(new GridWorker(ctx.gridName(), "msg-worker", log) {
+                         @Override protected void body() {
+                             try {
+                                 unwindMessageSet(msgSet, lsnrs0);
+                             }
+                             finally {
+                                 workersCnt.decrement();
+                             }
+                         }
+                     });
+ 
+                     success = true;
+                 }
+             }
+             catch (RejectedExecutionException e) {
+                 U.error(log, "Failed to process delayed message due to execution rejection. Increase the upper bound " +
+                     "on executor service provided in 'GridConfiguration.getExecutorService()'). Will attempt to " +
+                     "process message in the listener thread instead.", e);
+ 
+                 for (GridCommunicationMessageSet msgSet : msgSets)
+                     unwindMessageSet(msgSet, lsnr);
+             }
+             finally {
+                 // Decrement for last runnable submission of which failed.
+                 if (!success)
+                     workersCnt.decrement();
+             }
+         }
+     }
+ 
+     /**
+      * @param topic Message topic.
+      * @return Whether or not listener was indeed removed.
+      */
+     public boolean removeMessageListener(GridTopic topic) {
+         return removeMessageListener((Object)topic);
+     }
+ 
+     /**
+      * @param topic Message topic.
+      * @return Whether or not listener was indeed removed.
+      */
+     public boolean removeMessageListener(Object topic) {
+         return removeMessageListener(topic, null);
+     }
+ 
+     /**
+      * @param topic Listener's topic.
+      * @param lsnr Listener to remove.
+      * @return Whether or not the lsnr was removed.
+      */
+     @SuppressWarnings("deprecation")
+     public boolean removeMessageListener(GridTopic topic, @Nullable GridMessageListener lsnr) {
+         return removeMessageListener((Object)topic, lsnr);
+     }
+ 
+     /**
+      * @param topic Listener's topic.
+      * @param lsnr Listener to remove.
+      * @return Whether or not the lsnr was removed.
+      */
+     @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
+     public boolean removeMessageListener(Object topic, @Nullable final GridMessageListener lsnr) {
+         assert topic != null;
+ 
+         boolean rmv = true;
+ 
+         Collection<GridCommunicationMessageSet> msgSets = null;
+ 
+         // If listener is null, then remove all listeners.
+         if (lsnr == null) {
+             closedTopics.add(topic);
+ 
+             rmv = lsnrMap.remove(topic) != null;
+ 
+             Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
+ 
+             if (map != null)
+                 msgSets = map.values();
+         }
+         else {
+             for (;;) {
+                 GridMessageListener lsnrs = lsnrMap.get(topic);
+ 
+                 // If removing listener before subscription happened.
+                 if (lsnrs == null) {
+                     closedTopics.add(topic);
+ 
+                     Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
+ 
+                     if (map != null)
+                         msgSets = map.values();
+ 
+                     rmv = false;
+ 
+                     break;
+                 }
+                 else {
+                     boolean empty = false;
+ 
+                     if (!(lsnrs instanceof ArrayListener)) {
+                         if (lsnrs.equals(lsnr)) {
+                             if (!lsnrMap.remove(topic, lsnrs))
+                                 continue; // Retry because it can be packed to array listener.
+ 
+                             empty = true;
+                         }
+                         else
+                             rmv = false;
+                     }
+                     else {
+                         ArrayListener arrLsnr = (ArrayListener)lsnrs;
+ 
+                         if (arrLsnr.remove(lsnr))
+                             empty = arrLsnr.isEmpty();
+                         else
+                             // Listener was not found.
+                             rmv = false;
+ 
+                         if (empty)
+                             lsnrMap.remove(topic, lsnrs);
+                     }
+ 
+                     // If removing last subscribed listener.
+                     if (empty) {
+                         closedTopics.add(topic);
+ 
+                         Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
+ 
+                         if (map != null)
+                             msgSets = map.values();
+                     }
+ 
+                     break;
+                 }
+             }
+         }
+ 
+         if (msgSets != null)
+             for (GridCommunicationMessageSet msgSet : msgSets)
+                 ctx.timeout().removeTimeoutObject(msgSet);
+ 
+         if (rmv && log.isDebugEnabled())
+             log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
+ 
+         return rmv;
+     }
+ 
+     /**
+      * Gets sent messages count.
+      *
+      * @return Sent messages count.
+      */
+     public int getSentMessagesCount() {
+         return getSpi().getSentMessagesCount();
+     }
+ 
+     /**
+      * Gets sent bytes count.
+      *
+      * @return Sent bytes count.
+      */
+     public long getSentBytesCount() {
+         return getSpi().getSentBytesCount();
+     }
+ 
+     /**
+      * Gets received messages count.
+      *
+      * @return Received messages count.
+      */
+     public int getReceivedMessagesCount() {
+         return getSpi().getReceivedMessagesCount();
+     }
+ 
+     /**
+      * Gets received bytes count.
+      *
+      * @return Received bytes count.
+      */
+     public long getReceivedBytesCount() {
+         return getSpi().getReceivedBytesCount();
+     }
+ 
+     /**
+      * Gets outbound messages queue size.
+      *
+      * @return Outbound messages queue size.
+      */
+     public int getOutboundMessagesQueueSize() {
+         return getSpi().getOutboundMessagesQueueSize();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void printMemoryStats() {
+         X.println(">>>");
+         X.println(">>> IO manager memory stats [grid=" + ctx.gridName() + ']');
+         X.println(">>>  lsnrMapSize: " + lsnrMap.size());
+         X.println(">>>  msgSetMapSize: " + msgSetMap.size());
+         X.println(">>>  closedTopicsSize: " + closedTopics.sizex());
+         X.println(">>>  discoWaitMapSize: " + waitMap.size());
+     }
+ 
+     /**
+      * Linked chain of listeners.
+      */
+     private static class ArrayListener implements GridMessageListener {
+         /** */
+         private volatile GridMessageListener[] arr;
+ 
+         /**
+          * @param arr Array of listeners.
+          */
+         ArrayListener(GridMessageListener... arr) {
+             this.arr = arr;
+         }
+ 
+         /**
+          * Passes message to the whole chain.
+          *
+          * @param nodeId Node ID.
+          * @param msg Message.
+          */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             GridMessageListener[] arr0 = arr;
+ 
+             if (arr0 == null)
+                 return;
+ 
+             for (GridMessageListener l : arr0)
+                 l.onMessage(nodeId, msg);
+         }
+ 
+         /**
+          * @return {@code true} If this instance is empty.
+          */
+         boolean isEmpty() {
+             return arr == null;
+         }
+ 
+         /**
+          * @param l Listener.
+          * @return {@code true} If listener was removed.
+          */
+         synchronized boolean remove(GridMessageListener l) {
+             GridMessageListener[] arr0 = arr;
+ 
+             if (arr0 == null)
+                 return false;
+ 
+             if (arr0.length == 1) {
+                 if (!arr0[0].equals(l))
+                     return false;
+ 
+                 arr = null;
+ 
+                 return true;
+             }
+ 
+             for (int i = 0; i < arr0.length; i++) {
+                 if (arr0[i].equals(l)) {
+                     int newLen = arr0.length - 1;
+ 
+                     if (i == newLen) // Remove last.
+                         arr = Arrays.copyOf(arr0, newLen);
+                     else {
+                         GridMessageListener[] arr1 = new GridMessageListener[newLen];
+ 
+                         if (i != 0) // Not remove first.
+                             System.arraycopy(arr0, 0, arr1, 0, i);
+ 
+                         System.arraycopy(arr0, i + 1, arr1, i, newLen - i);
+ 
+                         arr = arr1;
+                     }
+ 
+                     return true;
+                 }
+             }
+ 
+             return false;
+         }
+ 
+         /**
+          * @param l Listener.
+          * @return {@code true} if listener was added. Add can fail if this instance is empty and is about to be removed
+          *         from map.
+          */
+         synchronized boolean add(GridMessageListener l) {
+             GridMessageListener[] arr0 = arr;
+ 
+             if (arr0 == null)
+                 return false;
+ 
+             int oldLen = arr0.length;
+ 
+             arr0 = Arrays.copyOf(arr0, oldLen + 1);
+ 
+             arr0[oldLen] = l;
+ 
+             arr = arr0;
+ 
+             return true;
+         }
+     }
+ 
+     /**
+      * This class represents a message listener wrapper that knows about peer deployment.
+      */
+     private class GridUserMessageListener implements GridMessageListener {
+         /** Predicate listeners. */
+         private final IgniteBiPredicate<UUID, Object> predLsnr;
+ 
+         /** User message topic. */
+         private final Object topic;
+ 
+         /**
+          * @param topic User topic.
+          * @param predLsnr Predicate listener.
+          * @throws IgniteCheckedException If failed to inject resources to predicates.
+          */
+         GridUserMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr)
+             throws IgniteCheckedException {
+             this.topic = topic;
+             this.predLsnr = predLsnr;
+ 
+             if (predLsnr != null)
+                 ctx.resource().injectGeneric(predLsnr);
+         }
+ 
+         /** {@inheritDoc} */
+         @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
+             "OverlyStrongTypeCast"})
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             if (!(msg instanceof GridIoUserMessage)) {
+                 U.error(log, "Received unknown message (potentially fatal problem): " + msg);
+ 
+                 return;
+             }
+ 
+             GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
+ 
+             ClusterNode node = ctx.discovery().node(nodeId);
+ 
+             if (node == null) {
+                 U.warn(log, "Failed to resolve sender node (did the node left grid?): " + nodeId);
+ 
+                 return;
+             }
+ 
+             Object msgBody = ioMsg.body();
+ 
+             assert msgBody != null || ioMsg.bodyBytes() != null;
+ 
+             try {
+                 byte[] msgTopicBytes = ioMsg.topicBytes();
+ 
+                 Object msgTopic = ioMsg.topic();
+ 
+                 GridDeployment dep = ioMsg.deployment();
+ 
+                 if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+                     ioMsg.deploymentClassName() != null) {
+                     dep = ctx.deploy().getGlobalDeployment(
+                         ioMsg.deploymentMode(),
+                         ioMsg.deploymentClassName(),
+                         ioMsg.deploymentClassName(),
+                         ioMsg.userVersion(),
+                         nodeId,
+                         ioMsg.classLoaderId(),
+                         ioMsg.loaderParticipants(),
+                         null);
+ 
+                     if (dep == null)
+                         throw new IgniteDeploymentException(
+                             "Failed to obtain deployment information for user message. " +
+                             "If you are using custom message or topic class, try implementing " +
+                             "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+ 
+                     ioMsg.deployment(dep); // Cache deployment.
+                 }
+ 
+                 // Unmarshall message topic if needed.
+                 if (msgTopic == null && msgTopicBytes != null) {
+                     msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+ 
+                     ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+                 }
+ 
+                 if (!F.eq(topic, msgTopic))
+                     return;
+ 
+                 if (msgBody == null) {
+                     msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+ 
+                     ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+                 }
+ 
+                 // Resource injection.
+                 if (dep != null)
+                     ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+                     msg + ']', e);
+             }
+ 
+             if (msgBody != null) {
+                 if (predLsnr != null) {
+                     if (!predLsnr.apply(nodeId, msgBody))
+                         removeMessageListener(TOPIC_COMM_USER, this);
+                 }
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean equals(Object o) {
+             if (this == o)
+                 return true;
+ 
+             if (o == null || getClass() != o.getClass())
+                 return false;
+ 
+             GridUserMessageListener l = (GridUserMessageListener)o;
+ 
+             return F.eq(predLsnr, l.predLsnr) && F.eq(topic, l.topic);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public int hashCode() {
+             int res = predLsnr != null ? predLsnr.hashCode() : 0;
+ 
+             res = 31 * res + (topic != null ? topic.hashCode() : 0);
+ 
+             return res;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(GridUserMessageListener.class, this);
+         }
+     }
+ 
+     /**
+      * Ordered communication message set.
+      */
+     private class GridCommunicationMessageSet implements GridTimeoutObject {
+         /** */
+         private final UUID nodeId;
+ 
+         /** */
+         private long endTime;
+ 
+         /** */
+         private final IgniteUuid timeoutId;
+ 
+         /** */
+         @GridToStringInclude
+         private final Object topic;
+ 
+         /** */
+         private final GridIoPolicy plc;
+ 
+         /** */
+         @GridToStringInclude
+         private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new ConcurrentLinkedDeque<>();
+ 
+         /** */
+         private final AtomicBoolean reserved = new AtomicBoolean();
+ 
+         /** */
+         private final long timeout;
+ 
+         /** */
+         private final boolean skipOnTimeout;
+ 
+         /** */
+         private long lastTs;
+ 
+         /**
+          * @param plc Communication policy.
+          * @param topic Communication topic.
+          * @param nodeId Node ID.
+          * @param timeout Timeout.
+          * @param skipOnTimeout Whether message can be skipped on timeout.
+          * @param msg Message to add immediately.
+          */
+         GridCommunicationMessageSet(
+             GridIoPolicy plc,
+             Object topic,
+             UUID nodeId,
+             long timeout,
+             boolean skipOnTimeout,
+             GridIoMessage msg
+         ) {
+             assert nodeId != null;
+             assert topic != null;
+             assert plc != null;
+             assert msg != null;
+ 
+             this.plc = plc;
+             this.nodeId = nodeId;
+             this.topic = topic;
+             this.timeout = timeout == 0 ? ctx.config().getNetworkTimeout() : timeout;
+             this.skipOnTimeout = skipOnTimeout;
+ 
+             endTime = endTime(timeout);
+ 
+             timeoutId = IgniteUuid.randomUuid();
+ 
+             lastTs = U.currentTimeMillis();
+ 
+             msgs.add(F.t(msg, lastTs));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgniteUuid timeoutId() {
+             return timeoutId;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public long endTime() {
+             return endTime;
+         }
+ 
+         /** {@inheritDoc} */
+         @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+         @Override public void onTimeout() {
+             GridMessageListener lsnr = lsnrMap.get(topic);
+ 
+             if (lsnr != null) {
+                 long delta = 0;
+ 
+                 if (skipOnTimeout) {
+                     while (true) {
+                         delta = 0;
+ 
+                         boolean unwind = false;
+ 
+                         synchronized (this) {
+                             if (!msgs.isEmpty()) {
+                                 delta = U.currentTimeMillis() - lastTs;
+ 
+                                 if (delta >= timeout)
+                                     unwind = true;
+                             }
+                         }
+ 
+                         if (unwind)
+                             unwindMessageSet(this, lsnr);
+                         else
+                             break;
+                     }
+                 }
+ 
+                 // Someone is still listening to messages, so delay set removal.
+                 endTime = endTime(timeout - delta);
+ 
+                 ctx.timeout().addTimeoutObject(this);
+ 
+                 return;
+             }
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Removing message set due to timeout: " + this);
+ 
+             ConcurrentMap<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic);
+ 
+             if (map != null) {
+                 boolean rmv;
+ 
+                 synchronized (map) {
+                     rmv = map.remove(nodeId, this) && map.isEmpty();
+                 }
+ 
+                 if (rmv)
+                     msgSetMap.remove(topic, map);
+             }
+         }
+ 
+         /**
+          * @return ID of node that sent the messages in the set.
+          */
+         UUID nodeId() {
+             return nodeId;
+         }
+ 
+         /**
+          * @return Communication policy.
+          */
+         GridIoPolicy policy() {
+             return plc;
+         }
+ 
+         /**
+          * @return Message topic.
+          */
+         Object topic() {
+             return topic;
+         }
+ 
+         /**
+          * @return {@code True} if successful.
+          */
+         boolean reserve() {
+             return reserved.compareAndSet(false, true);
+         }
+ 
+         /**
+          * @return {@code True} if set is reserved.
+          */
+         boolean reserved() {
+             return reserved.get();
+         }
+ 
+         /**
+          * Releases reservation.
+          */
+         void release() {
+             assert reserved.get() : "Message set was not reserved: " + this;
+ 
+             reserved.set(false);
+         }
+ 
+         /**
+          * @param lsnr Listener to notify.
+          */
+         void unwind(GridMessageListener lsnr) {
+             assert reserved.get();
+ 
+             for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t != null; t = msgs.poll())
+                 lsnr.onMessage(nodeId, t.get1().message());
+         }
+ 
+         /**
+          * @param msg Message to add.
+          */
+         void add(GridIoMessage msg) {
+             msgs.add(F.t(msg, U.currentTimeMillis()));
+         }
+ 
+         /**
+          * @return {@code True} if set has messages to unwind.
+          */
+         boolean changed() {
+             return !msgs.isEmpty();
+         }
+ 
+         /**
+          * Calculates end time with overflow check.
+          *
+          * @param timeout Timeout in milliseconds.
+          * @return End time in milliseconds.
+          */
+         private long endTime(long timeout) {
+             long endTime = U.currentTimeMillis() + timeout;
+ 
+             // Account for overflow.
+             if (endTime < 0)
+                 endTime = Long.MAX_VALUE;
+ 
+             return endTime;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(GridCommunicationMessageSet.class, this);
+         }
+     }
+ 
+     /**
+      *
+      */
+     private static class ConcurrentHashMap0<K, V> extends ConcurrentHashMap8<K, V> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** */
+         private int hash;
+ 
+         /**
+          * @param o Object to be compared for equality with this map.
+          * @return {@code True} only for {@code this}.
+          */
+         @Override public boolean equals(Object o) {
+             return o == this;
+         }
+ 
+         /**
+          * @return Identity hash code.
+          */
+         @Override public int hashCode() {
+             if (hash == 0) {
+                 int hash0 = System.identityHashCode(this);
+ 
+                 hash = hash0 != 0 ? hash0 : -1;
+             }
+ 
+             return hash;
+         }
+     }
+ 
+     /**
+      *
+      */
+     private static class DelayedMessage {
+         /** */
+         private final UUID nodeId;
+ 
+         /** */
+         private final GridIoMessage msg;
+ 
+         /** */
+         private final IgniteRunnable msgC;
+ 
+         /**
+          * @param nodeId Node ID.
+          * @param msg Message.
+          * @param msgC Callback.
+          */
+         private DelayedMessage(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
+             this.nodeId = nodeId;
+             this.msg = msg;
+             this.msgC = msgC;
+         }
+ 
+         /**
+          * @return Message char.
+          */
+         public IgniteRunnable callback() {
+             return msgC;
+         }
+ 
+         /**
+          * @return Message.
+          */
+         public GridIoMessage message() {
+             return msg;
+         }
+ 
+         /**
+          * @return Node id.
+          */
+         public UUID nodeId() {
+             return nodeId;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(DelayedMessage.class, this, super.toString());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 0000000,e7067f2..fd67067
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@@ -1,0 -1,348 +1,344 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.managers.communication;
+ 
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ 
+ /**
+  * Wrapper for all grid messages.
+  */
+ public class GridIoMessage extends GridTcpCommunicationMessageAdapter {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Policy. */
+     private GridIoPolicy plc;
+ 
+     /** Message topic. */
+     @GridToStringInclude
+     @GridDirectTransient
+     private Object topic;
+ 
+     /** Topic bytes. */
+     private byte[] topicBytes;
+ 
+     /** Topic ordinal. */
+     private int topicOrd = -1;
+ 
+     /** Message ordered flag. */
+     private boolean ordered;
+ 
+     /** Message timeout. */
+     private long timeout;
+ 
+     /** Whether message can be skipped on timeout. */
+     private boolean skipOnTimeout;
+ 
+     /** Message. */
+     private GridTcpCommunicationMessageAdapter msg;
+ 
+     /**
+      * No-op constructor to support {@link Externalizable} interface.
+      * This constructor is not meant to be used for other purposes.
+      */
+     public GridIoMessage() {
+         // No-op.
+     }
+ 
+     /**
+      * @param plc Policy.
+      * @param topic Communication topic.
+      * @param topicOrd Topic ordinal value.
+      * @param msg Message.
+      * @param ordered Message ordered flag.
+      * @param timeout Timeout.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      */
+     public GridIoMessage(
+         GridIoPolicy plc,
+         Object topic,
+         int topicOrd,
+         GridTcpCommunicationMessageAdapter msg,
+         boolean ordered,
+         long timeout,
+         boolean skipOnTimeout
+     ) {
+         assert plc != null;
+         assert topic != null;
+         assert topicOrd <= Byte.MAX_VALUE;
+         assert msg != null;
+ 
+         this.plc = plc;
+         this.msg = msg;
+         this.topic = topic;
+         this.topicOrd = topicOrd;
+         this.ordered = ordered;
+         this.timeout = timeout;
+         this.skipOnTimeout = skipOnTimeout;
+     }
+ 
+     /**
+      * @return Policy.
+      */
+     GridIoPolicy policy() {
+         return plc;
+     }
+ 
+     /**
+      * @return Topic.
+      */
+     Object topic() {
+         return topic;
+     }
+ 
+     /**
+      * @param topic Topic.
+      */
+     void topic(Object topic) {
+         this.topic = topic;
+     }
+ 
+     /**
+      * @return Topic bytes.
+      */
+     byte[] topicBytes() {
+         return topicBytes;
+     }
+ 
+     /**
+      * @param topicBytes Topic bytes.
+      */
+     void topicBytes(byte[] topicBytes) {
+         this.topicBytes = topicBytes;
+     }
+ 
+     /**
+      * @return Topic ordinal.
+      */
+     int topicOrdinal() {
+         return topicOrd;
+     }
+ 
+     /**
+      * @return Message.
+      */
+     public Object message() {
+         return msg;
+     }
+ 
+     /**
+      * @return Message timeout.
+      */
+     public long timeout() {
+         return timeout;
+     }
+ 
+     /**
+      * @return Whether message can be skipped on timeout.
+      */
+     public boolean skipOnTimeout() {
+         return skipOnTimeout;
+     }
+ 
+     /**
+      * @return {@code True} if message is ordered, {@code false} otherwise.
+      */
+     boolean isOrdered() {
+         return ordered;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean equals(Object obj) {
+         throw new AssertionError();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int hashCode() {
+         throw new AssertionError();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+     @Override public GridTcpCommunicationMessageAdapter clone() {
+         GridIoMessage _clone = new GridIoMessage();
+ 
+         clone0(_clone);
+ 
+         return _clone;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("RedundantCast")
+     @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+         GridIoMessage _clone = (GridIoMessage)_msg;
+ 
+         _clone.plc = plc;
+         _clone.topic = topic;
+         _clone.topicBytes = topicBytes;
+         _clone.topicOrd = topicOrd;
+         _clone.ordered = ordered;
+         _clone.timeout = timeout;
+         _clone.skipOnTimeout = skipOnTimeout;
+         _clone.msg = msg != null ? (GridTcpCommunicationMessageAdapter)msg.clone() : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("all")
+     @Override public boolean writeTo(ByteBuffer buf) {
+         commState.setBuffer(buf);
+ 
+         if (!commState.typeWritten) {
 -            if (!commState.putByte(directType()))
++            if (!commState.putByte(null, directType()))
+                 return false;
+ 
+             commState.typeWritten = true;
+         }
+ 
+         switch (commState.idx) {
+             case 0:
 -                if (!commState.putMessage(msg))
++                if (!commState.putMessage("msg", msg))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 1:
 -                if (!commState.putBoolean(ordered))
++                if (!commState.putBoolean("ordered", ordered))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 2:
 -                if (!commState.putEnum(plc))
++                if (!commState.putEnum("plc", plc))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 3:
 -                if (!commState.putBoolean(skipOnTimeout))
++                if (!commState.putBoolean("skipOnTimeout", skipOnTimeout))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 4:
 -                if (!commState.putLong(timeout))
++                if (!commState.putLong("timeout", timeout))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 5:
 -                if (!commState.putByteArray(topicBytes))
++                if (!commState.putByteArray("topicBytes", topicBytes))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 6:
 -                if (!commState.putInt(topicOrd))
++                if (!commState.putInt("topicOrd", topicOrd))
+                     return false;
+ 
+                 commState.idx++;
+ 
+         }
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("all")
+     @Override public boolean readFrom(ByteBuffer buf) {
+         commState.setBuffer(buf);
+ 
+         switch (commState.idx) {
+             case 0:
 -                Object msg0 = commState.getMessage();
++                msg = (GridTcpCommunicationMessageAdapter)commState.getMessage("msg");
+ 
 -                if (msg0 == MSG_NOT_READ)
++                if (!commState.lastRead())
+                     return false;
+ 
 -                msg = (GridTcpCommunicationMessageAdapter)msg0;
 -
+                 commState.idx++;
+ 
+             case 1:
 -                if (buf.remaining() < 1)
 -                    return false;
++                ordered = commState.getBoolean("ordered");
+ 
 -                ordered = commState.getBoolean();
++                if (!commState.lastRead())
++                    return false;
+ 
+                 commState.idx++;
+ 
+             case 2:
 -                if (buf.remaining() < 1)
 -                    return false;
++                byte plc0 = commState.getByte("plc");
+ 
 -                byte plc0 = commState.getByte();
++                if (!commState.lastRead())
++                    return false;
+ 
+                 plc = GridIoPolicy.fromOrdinal(plc0);
+ 
+                 commState.idx++;
+ 
+             case 3:
 -                if (buf.remaining() < 1)
 -                    return false;
++                skipOnTimeout = commState.getBoolean("skipOnTimeout");
+ 
 -                skipOnTimeout = commState.getBoolean();
++                if (!commState.lastRead())
++                    return false;
+ 
+                 commState.idx++;
+ 
+             case 4:
 -                if (buf.remaining() < 8)
 -                    return false;
++                timeout = commState.getLong("timeout");
+ 
 -                timeout = commState.getLong();
++                if (!commState.lastRead())
++                    return false;
+ 
+                 commState.idx++;
+ 
+             case 5:
 -                byte[] topicBytes0 = commState.getByteArray();
++                topicBytes = commState.getByteArray("topicBytes");
+ 
 -                if (topicBytes0 == BYTE_ARR_NOT_READ)
++                if (!commState.lastRead())
+                     return false;
+ 
 -                topicBytes = topicBytes0;
 -
+                 commState.idx++;
+ 
+             case 6:
 -                if (buf.remaining() < 4)
 -                    return false;
++                topicOrd = commState.getInt("topicOrd");
+ 
 -                topicOrd = commState.getInt();
++                if (!commState.lastRead())
++                    return false;
+ 
+                 commState.idx++;
+ 
+         }
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public byte directType() {
+         return 8;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridIoMessage.class, this);
+     }
+ }