You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:38 UTC
[46/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 0000000,6996e6f..b918b68
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -1,0 -1,2043 +1,2153 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.managers.communication;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
++import org.apache.ignite.plugin.extensions.communication.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.spi.communication.*;
+ import org.apache.ignite.internal.managers.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.util.*;
+ import java.util.Map.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.*;
+ import static org.jdk8.backport.ConcurrentLinkedHashMap.QueuePolicy.*;
+
+ /**
+ * Grid communication manager.
+ */
+ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
+ /** Max closed topics to store. */
+ public static final int MAX_CLOSED_TOPICS = 10240;
+
+ /** Listeners by topic. */
+ private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
+
+ /** Disconnect listeners. */
+ private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
+
+ /** Public pool. */
+ private ExecutorService pubPool;
+
+ /** Internal P2P pool. */
+ private ExecutorService p2pPool;
+
+ /** Internal system pool. */
+ private ExecutorService sysPool;
+
+ /** Internal management pool. */
+ private ExecutorService mgmtPool;
+
+ /** Affinity assignment executor service. */
+ private ExecutorService affPool;
+
+ /** Utility cache pool. */
+ private ExecutorService utilityCachePool;
+
+ /** Discovery listener. */
+ private GridLocalEventListener discoLsnr;
+
+ /** */
+ private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap =
+ new ConcurrentHashMap8<>();
+
+ /** Local node ID. */
+ private final UUID locNodeId;
+
+ /** Discovery delay. */
+ private final long discoDelay;
+
+ /** Cache for messages that were received prior to discovery. */
+ private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap =
+ new ConcurrentHashMap8<>();
+
+ /** Communication message listener. */
+ private CommunicationListener<Serializable> commLsnr;
+
+ /** Grid marshaller. */
+ private final IgniteMarshaller marsh;
+
+ /** Busy lock. */
+ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+
+ /** Lock to sync maps access. */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Message cache. */
+ private ThreadLocal<IgniteBiTuple<Object, byte[]>> cacheMsg =
+ new GridThreadLocal<IgniteBiTuple<Object, byte[]>>() {
+ @Nullable @Override protected IgniteBiTuple<Object, byte[]> initialValue() {
+ return null;
+ }
+ };
+
+ /** Fully started flag. When set to true, can send and receive messages. */
+ private volatile boolean started;
+
+ /** Closed topics. */
+ private final GridBoundedConcurrentLinkedHashSet<Object> closedTopics =
+ new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS, MAX_CLOSED_TOPICS, 0.75f, 256,
+ PER_SEGMENT_Q_OPTIMIZED_RMV);
+
+ /** Workers count. */
+ private final LongAdder workersCnt = new LongAdder();
+
++ /** */
++ private int pluginMsg = GridTcpCommunicationMessageFactory.MAX_COMMON_TYPE;
++
++ /** */
++ private Map<Byte, GridTcpCommunicationMessageProducer> pluginMsgs;
++
++ /** */
++ private MessageFactory msgFactory;
++
++ /** */
++ private MessageWriterFactory writerFactory;
++
++ /** */
++ private MessageReaderFactory readerFactory;
++
+ /**
+ * @param ctx Grid kernal context.
+ */
+ @SuppressWarnings("deprecation")
+ public GridIoManager(GridKernalContext ctx) {
+ super(ctx, ctx.config().getCommunicationSpi());
+
+ locNodeId = ctx.localNodeId();
+
+ discoDelay = ctx.config().getDiscoveryStartupDelay();
+
+ marsh = ctx.config().getMarshaller();
+ }
+
+ /**
++ * @param producer Message producer.
++ * @return Message type code.
++ */
++ public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer) {
++ int nextMsg = ++pluginMsg;
++
++ if (nextMsg > Byte.MAX_VALUE)
++ throw new IgniteException();
++
++ if (pluginMsgs == null)
++ pluginMsgs = new HashMap<>();
++
++ pluginMsgs.put((byte)nextMsg, producer);
++
++ return (byte)nextMsg;
++ }
++
++ /**
++ * Initializes manager (called prior to discovery start, but after all other components).
++ */
++ public void initMessageFactory() {
++ final GridTcpCommunicationMessageProducer[] common = GridTcpCommunicationMessageFactory.commonProducers();
++
++ final GridTcpCommunicationMessageProducer[] producers;
++
++ if (pluginMsgs != null) {
++ producers = Arrays.copyOf(common, pluginMsg + 1);
++
++ for (Map.Entry<Byte, GridTcpCommunicationMessageProducer> e : pluginMsgs.entrySet()) {
++ assert producers[e.getKey()] == null : e.getKey();
++
++ producers[e.getKey()] = e.getValue();
++ }
++
++ pluginMsgs = null;
++ }
++ else
++ producers = common;
++
++ msgFactory = new MessageFactory() {
++ @Override public GridTcpCommunicationMessageAdapter create(byte type) {
++ GridTcpCommunicationMessageAdapter msg;
++
++ if (type < 0 || type >= producers.length)
++ msg = GridTcpCommunicationMessageFactory.create(type);
++ else {
++ GridTcpCommunicationMessageProducer producer = producers[type];
++
++ if (producer == null)
++ throw new IllegalStateException("Common message type producer is not registered: " + type);
++
++ msg = producer.create(type);
++ }
++
++ msg.setReader(readerFactory.reader());
++
++ return msg;
++ }
++ };
++ }
++
++ public MessageFactory messageFactory() {
++ assert msgFactory != null;
++
++ return msgFactory;
++ }
++
++ /**
+ * Resets metrics for this manager.
+ */
+ public void resetMetrics() {
+ getSpi().resetMetrics();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public void start() throws IgniteCheckedException {
+ assertParameter(discoDelay > 0, "discoveryStartupDelay > 0");
+
+ startSpi();
+
+ pubPool = ctx.config().getExecutorService();
+ p2pPool = ctx.config().getPeerClassLoadingExecutorService();
+ sysPool = ctx.config().getSystemExecutorService();
+ mgmtPool = ctx.config().getManagementExecutorService();
+ utilityCachePool = ctx.utilityCachePool();
+ affPool = Executors.newFixedThreadPool(1);
+
+ getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
+ @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
+ try {
+ onMessage0(nodeId, (GridIoMessage)msg, msgC);
+ }
+ catch (ClassCastException ignored) {
+ U.error(log, "Communication manager received message of unknown type (will ignore): " +
+ msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+ "which is illegal - make sure to send messages only via GridProjection API.");
+ }
+ }
+
+ @Override public void onDisconnected(UUID nodeId) {
+ for (GridDisconnectListener lsnr : disconnectLsnrs)
+ lsnr.onNodeDisconnected(nodeId);
+ }
+ });
+
++ MessageWriterFactory[] writerExt = ctx.plugins().extensions(MessageWriterFactory.class);
++
++ if (writerExt != null && writerExt.length > 0)
++ writerFactory = writerExt[0];
++ else {
++ writerFactory = new MessageWriterFactory() {
++ @Override public MessageWriter writer() {
++ return new GridTcpCommunicationMessageWriter();
++ }
++ };
++ }
++
++ MessageReaderFactory[] readerExt = ctx.plugins().extensions(MessageReaderFactory.class);
++
++ if (readerExt != null && readerExt.length > 0)
++ readerFactory = readerExt[0];
++ else {
++ readerFactory = new MessageReaderFactory() {
++ @Override public MessageReader reader() {
++ return new GridTcpCommunicationMessageReader(msgFactory);
++ }
++ };
++ }
++
+ if (log.isDebugEnabled())
+ log.debug(startInfo());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
+ @Override public void onKernalStart0() throws IgniteCheckedException {
+ discoLsnr = new GridLocalEventListener() {
+ @SuppressWarnings({"TooBroadScope", "fallthrough"})
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt instanceof IgniteDiscoveryEvent : "Invalid event: " + evt;
+
+ IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+ UUID nodeId = discoEvt.eventNode().id();
+
+ switch (evt.type()) {
+ case EVT_NODE_JOINED:
+ assert waitMap.get(nodeId) == null; // We can't receive messages from undiscovered nodes.
+
+ break;
+
+ case EVT_NODE_LEFT:
+ case EVT_NODE_FAILED:
+ for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e :
+ msgSetMap.entrySet()) {
+ ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
+
+ GridCommunicationMessageSet set;
+
+ boolean empty;
+
+ synchronized (map) {
+ set = map.remove(nodeId);
+
+ empty = map.isEmpty();
+ }
+
+ if (set != null) {
+ if (log.isDebugEnabled())
+ log.debug("Removed message set due to node leaving grid: " + set);
+
+ // Unregister timeout listener.
+ ctx.timeout().removeTimeoutObject(set);
+
+ // Node may still send stale messages for this topic
+ // even after discovery notification is done.
+ closedTopics.add(set.topic());
+ }
+
+ if (empty)
+ msgSetMap.remove(e.getKey(), map);
+ }
+
+ // Clean up delayed and ordered messages (need exclusive lock).
+ lock.writeLock().lock();
+
+ try {
+ ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Removed messages from discovery startup delay list " +
+ "(sender node left topology): " + waitList);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ break;
+
+ default:
+ assert false : "Unexpected event: " + evt;
+ }
+ }
+ };
+
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ // Make sure that there are no stale messages due to window between communication
+ // manager start and kernal start.
+ // 1. Process wait list.
+ Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>();
+
+ lock.writeLock().lock();
+
+ try {
+ started = true;
+
+ for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet()) {
+ if (ctx.discovery().node(e.getKey()) != null) {
+ ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey());
+
+ if (log.isDebugEnabled())
+ log.debug("Processing messages from discovery startup delay list: " + waitList);
+
+ if (waitList != null)
+ delayedMsgs.add(waitList);
+ }
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ // After write lock released.
+ if (!delayedMsgs.isEmpty()) {
+ for (Collection<DelayedMessage> col : delayedMsgs)
+ for (DelayedMessage msg : col)
+ commLsnr.onMessage(msg.nodeId(), msg.message(), msg.callback());
+ }
+
+ // 2. Process messages sets.
+ for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e : msgSetMap.entrySet()) {
+ ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
+
+ for (GridCommunicationMessageSet set : map.values()) {
+ if (ctx.discovery().node(set.nodeId()) == null) {
+ // All map modifications should be synced for consistency.
+ boolean rmv;
+
+ synchronized (map) {
+ rmv = map.remove(set.nodeId(), set);
+ }
+
+ if (rmv) {
+ if (log.isDebugEnabled())
+ log.debug("Removed message set due to node leaving grid: " + set);
+
+ // Unregister timeout listener.
+ ctx.timeout().removeTimeoutObject(set);
+ }
+
+ }
+ }
+
+ boolean rmv;
+
+ synchronized (map) {
+ rmv = map.isEmpty();
+ }
+
+ if (rmv) {
+ msgSetMap.remove(e.getKey(), map);
+
+ // Node may still send stale messages for this topic
+ // even after discovery notification is done.
+ closedTopics.add(e.getKey());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("BusyWait")
+ @Override public void onKernalStop0(boolean cancel) {
+ // No more communication messages.
+ getSpi().setListener(null);
+
+ busyLock.writeLock();
+
+ U.shutdownNow(getClass(), affPool, log);
+
+ boolean interrupted = false;
+
+ while (workersCnt.sum() != 0) {
+ try {
+ Thread.sleep(200);
+ }
+ catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted)
+ Thread.currentThread().interrupt();
+
+ GridEventStorageManager evtMgr = ctx.event();
+
+ if (evtMgr != null && discoLsnr != null)
+ evtMgr.removeLocalEventListener(discoLsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ stopSpi();
+
+ // Clear cache.
+ cacheMsg.set(null);
+
+ if (log.isDebugEnabled())
+ log.debug(stopInfo());
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message bytes.
+ * @param msgC Closure to call when message processing finished.
+ */
+ @SuppressWarnings("fallthrough")
+ private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
+ assert nodeId != null;
+ assert msg != null;
+
+ if (!busyLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Received communication message while stopping grid.");
+
+ return;
+ }
+
+ try {
+ // Check discovery.
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']');
+
+ return; // We can't receive messages from non-discovered ones.
+ }
+
+ if (msg.topic() == null) {
+ int topicOrd = msg.topicOrdinal();
+
+ msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) : marsh.unmarshal(msg.topicBytes(), null));
+ }
+
+ if (!started) {
+ lock.readLock().lock();
+
+ try {
+ if (!started) { // Sets to true in write lock, so double checking.
+ // Received message before valid context is set to manager.
+ if (log.isDebugEnabled())
+ log.debug("Adding message to waiting list [senderId=" + nodeId +
+ ", msg=" + msg + ']');
+
+ ConcurrentLinkedDeque8<DelayedMessage> list =
+ F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque());
+
+ assert list != null;
+
+ list.add(new DelayedMessage(nodeId, msg, msgC));
+
+ return;
+ }
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // If message is P2P, then process in P2P service.
+ // This is done to avoid extra waiting and potential deadlocks
+ // as thread pool may not have any available threads to give.
+ GridIoPolicy plc = msg.policy();
+
+ switch (plc) {
+ case P2P_POOL: {
+ processP2PMessage(nodeId, msg, msgC);
+
+ break;
+ }
+
+ case PUBLIC_POOL:
+ case SYSTEM_POOL:
+ case MANAGEMENT_POOL:
+ case AFFINITY_POOL:
+ case UTILITY_CACHE_POOL: {
+ if (msg.isOrdered())
+ processOrderedMessage(nodeId, msg, plc, msgC);
+ else
+ processRegularMessage(nodeId, msg, plc, msgC);
+
+ break;
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process message (will ignore): " + msg, e);
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Gets execution pool for policy.
+ *
+ * @param plc Policy.
+ * @return Execution pool.
+ */
+ private Executor pool(GridIoPolicy plc) {
+ switch (plc) {
+ case P2P_POOL:
+ return p2pPool;
+ case SYSTEM_POOL:
+ return sysPool;
+ case PUBLIC_POOL:
+ return pubPool;
+ case MANAGEMENT_POOL:
+ return mgmtPool;
+ case AFFINITY_POOL:
+ return affPool;
+ case UTILITY_CACHE_POOL:
+ assert utilityCachePool != null : "Utility cache pool is not configured.";
+
+ return utilityCachePool;
+
+ default: {
+ assert false : "Invalid communication policy: " + plc;
+
+ // Never reached.
+ return null;
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param msgC Closure to call when message processing finished.
+ */
+ private void processP2PMessage(
+ final UUID nodeId,
+ final GridIoMessage msg,
+ final IgniteRunnable msgC
+ ) {
+ workersCnt.increment();
+
+ Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+ @Override protected void body() {
+ try {
+ threadProcessingMessage(true);
+
+ GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+ if (lsnr == null)
+ return;
+
+ Object obj = msg.message();
+
+ assert obj != null;
+
+ lsnr.onMessage(nodeId, obj);
+ }
+ finally {
+ threadProcessingMessage(false);
+
+ workersCnt.decrement();
+
+ msgC.run();
+ }
+ }
+ };
+
+ try {
+ p2pPool.execute(c);
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process P2P message due to execution rejection. Increase the upper bound " +
+ "on 'ExecutorService' provided by 'GridConfiguration.getPeerClassLoadingExecutorService()'. " +
+ "Will attempt to process message in the listener thread instead.", e);
+
+ c.run();
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param plc Execution policy.
+ * @param msgC Closure to call when message processing finished.
+ */
+ private void processRegularMessage(
+ final UUID nodeId,
+ final GridIoMessage msg,
+ GridIoPolicy plc,
+ final IgniteRunnable msgC
+ ) {
+ workersCnt.increment();
+
+ Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+ @Override protected void body() {
+ try {
+ threadProcessingMessage(true);
+
+ processRegularMessage0(msg, nodeId);
+ }
+ finally {
+ threadProcessingMessage(false);
+
+ workersCnt.decrement();
+
+ msgC.run();
+ }
+ }
+ };
+
+ try {
+ pool(plc).execute(c);
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " +
+ "on 'ExecutorService' provided by 'GridConfiguration.getExecutorService()'. " +
+ "Will attempt to process message in the listener thread instead.", e);
+
+ c.run();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ * @param nodeId Node ID.
+ */
+ @SuppressWarnings("deprecation")
+ private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {
+ GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+ if (lsnr == null)
+ return;
+
+ Object obj = msg.message();
+
+ assert obj != null;
+
+ lsnr.onMessage(nodeId, obj);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Ordered message.
+ * @param plc Execution policy.
+ * @param msgC Closure to call when message processing finished ({@code null} for sync processing).
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private void processOrderedMessage(
+ final UUID nodeId,
+ final GridIoMessage msg,
+ final GridIoPolicy plc,
+ @Nullable final IgniteRunnable msgC
+ ) {
+ assert msg != null;
+
+ long timeout = msg.timeout();
+ boolean skipOnTimeout = msg.skipOnTimeout();
+
+ boolean isNew = false;
+
+ ConcurrentMap<UUID, GridCommunicationMessageSet> map;
+
+ GridCommunicationMessageSet set = null;
+
+ while (true) {
+ map = msgSetMap.get(msg.topic());
+
+ if (map == null) {
+ set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg);
+
+ map = new ConcurrentHashMap0<>();
+
+ map.put(nodeId, set);
+
+ ConcurrentMap<UUID, GridCommunicationMessageSet> old = msgSetMap.putIfAbsent(
+ msg.topic(), map);
+
+ if (old != null)
+ map = old;
+ else {
+ isNew = true;
+
+ // Put succeeded.
+ break;
+ }
+ }
+
+ boolean rmv = false;
+
+ synchronized (map) {
+ if (map.isEmpty())
+ rmv = true;
+ else {
+ set = map.get(nodeId);
+
+ if (set == null) {
+ GridCommunicationMessageSet old = map.putIfAbsent(nodeId,
+ set = new GridCommunicationMessageSet(plc, msg.topic(),
+ nodeId, timeout, skipOnTimeout, msg));
+
+ assert old == null;
+
+ isNew = true;
+
+ // Put succeeded.
+ break;
+ }
+ }
+ }
+
+ if (rmv)
+ msgSetMap.remove(msg.topic(), map);
+ else {
+ assert set != null;
+ assert !isNew;
+
+ set.add(msg);
+
+ break;
+ }
+ }
+
+ if (isNew && ctx.discovery().node(nodeId) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Message is ignored as sender has left the grid: " + msg);
+
+ assert map != null;
+
+ boolean rmv;
+
+ synchronized (map) {
+ map.remove(nodeId);
+
+ rmv = map.isEmpty();
+ }
+
+ if (rmv)
+ msgSetMap.remove(msg.topic(), map);
+
+ return;
+ }
+
+ if (isNew && set.endTime() != Long.MAX_VALUE)
+ ctx.timeout().addTimeoutObject(set);
+
+ if (set.reserved()) {
+ // Set is reserved which means that it is currently processed by worker thread.
+ if (msgC != null)
+ msgC.run();
+
+ return;
+ }
+
+ final GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+ if (lsnr == null) {
+ if (closedTopics.contains(msg.topic())) {
+ if (log.isDebugEnabled())
+ log.debug("Message is ignored as it came for the closed topic: " + msg);
+
+ assert map != null;
+
+ msgSetMap.remove(msg.topic(), map);
+ }
+ else if (log.isDebugEnabled()) {
+ // Note that we simply keep messages if listener is not
+ // registered yet, until one will be registered.
+ log.debug("Received message for unknown listener (messages will be kept until a " +
+ "listener is registered): " + msg);
+ }
+
+ // Mark the message as processed.
+ if (msgC != null)
+ msgC.run();
+
+ return;
+ }
+
+ if (msgC == null) {
+ // Message from local node can be processed in sync manner.
+ assert locNodeId.equals(nodeId);
+
+ unwindMessageSet(set, lsnr);
+
+ return;
+ }
+
+ // Set is not reserved and new worker should be submitted.
+ workersCnt.increment();
+
+ final GridCommunicationMessageSet msgSet0 = set;
+
+ Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+ @Override protected void body() {
+ try {
+ threadProcessingMessage(true);
+
+ unwindMessageSet(msgSet0, lsnr);
+ }
+ finally {
+ threadProcessingMessage(false);
+
+ workersCnt.decrement();
+
+ msgC.run();
+ }
+ }
+ };
+
+ try {
+ pool(plc).execute(c);
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process ordered message due to execution rejection. " +
+ "Increase the upper bound on executor service provided by corresponding " +
+ "configuration property. Will attempt to process message in the listener " +
+ "thread instead [msgPlc=" + plc + ']', e);
+
+ c.run();
+ }
+ }
+
+ /**
+ * @param msgSet Message set to unwind.
+ * @param lsnr Listener to notify.
+ */
+ private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageListener lsnr) {
+ // Loop until message set is empty or
+ // another thread owns the reservation.
+ while (true) {
+ if (msgSet.reserve()) {
+ try {
+ msgSet.unwind(lsnr);
+ }
+ finally {
+ msgSet.release();
+ }
+
+ // Check outside of reservation block.
+ if (!msgSet.changed()) {
+ if (log.isDebugEnabled())
+ log.debug("Message set has not been changed: " + msgSet);
+
+ break;
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Another thread owns reservation: " + msgSet);
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param topicOrd GridTopic enumeration ordinal.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param ordered Ordered flag.
+ * @param timeout Timeout.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ private void send(
+ ClusterNode node,
+ Object topic,
+ int topicOrd,
+ GridTcpCommunicationMessageAdapter msg,
+ GridIoPolicy plc,
+ boolean ordered,
+ long timeout,
+ boolean skipOnTimeout
+ ) throws IgniteCheckedException {
+ assert node != null;
+ assert topic != null;
+ assert msg != null;
+ assert plc != null;
+
+ GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
+
+ if (locNodeId.equals(node.id())) {
+ assert plc != P2P_POOL;
+
+ CommunicationListener commLsnr = this.commLsnr;
+
+ if (commLsnr == null)
+ throw new IgniteCheckedException("Trying to send message when grid is not fully started.");
+
+ if (ordered)
+ processOrderedMessage(locNodeId, ioMsg, plc, null);
+ else
+ processRegularMessage0(ioMsg, locNodeId);
+ }
+ else {
++ ioMsg.setWriter(writerFactory.writer());
++
+ if (topicOrd < 0)
+ ioMsg.topicBytes(marsh.marshal(topic));
+
+ try {
+ getSpi().sendMessage(node, ioMsg);
+ }
+ catch (IgniteSpiException e) {
+ throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
+ "TCP connection cannot be established due to firewall issues) " +
+ "[node=" + node + ", topic=" + topic +
+ ", msg=" + msg + ", policy=" + plc + ']', e);
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Id of destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(UUID nodeId, Object topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+ send(node, topic, msg, plc);
+ }
+
+ /**
+ * @param nodeId Id of destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ public void send(UUID nodeId, GridTopic topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(ClusterNode node, Object topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
+ send(node, topic, -1, msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(ClusterNode node, GridTopic topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ ClusterNode node,
+ Object topic,
+ GridTcpCommunicationMessageAdapter msg,
+ GridIoPolicy plc,
+ long timeout,
+ boolean skipOnTimeout
+ ) throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+ }
+
+ /**
+ * @param nodeId Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ UUID nodeId,
+ Object topic,
+ GridTcpCommunicationMessageAdapter msg,
+ GridIoPolicy plc,
+ long timeout,
+ boolean skipOnTimeout
+ ) throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ Collection<? extends ClusterNode> nodes,
+ Object topic,
+ GridTcpCommunicationMessageAdapter msg,
+ GridIoPolicy plc,
+ long timeout,
+ boolean skipOnTimeout
+ )
+ throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
+ send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(
+ Collection<? extends ClusterNode> nodes,
+ Object topic,
+ GridTcpCommunicationMessageAdapter msg,
+ GridIoPolicy plc
+ ) throws IgniteCheckedException {
+ send(nodes, topic, -1, msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(
+ Collection<? extends ClusterNode> nodes,
+ GridTopic topic,
+ GridTcpCommunicationMessageAdapter msg,
+ GridIoPolicy plc
+ ) throws IgniteCheckedException {
+ send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+ }
+
+ /**
+ * Sends a peer deployable user message.
+ *
+ * @param nodes Destination nodes.
+ * @param msg Message to send.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
+ sendUserMessage(nodes, msg, null, false, 0);
+ }
+
+ /**
+ * Sends a peer deployable user message.
+ *
+ * @param nodes Destination nodes.
+ * @param msg Message to send.
+ * @param topic Message topic to use.
+ * @param ordered Is message ordered?
+ * @param timeout Message timeout in milliseconds for ordered messages.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
+ @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException {
+ boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
+
+ byte[] serMsg = null;
+ byte[] serTopic = null;
+
+ if (!loc) {
+ serMsg = marsh.marshal(msg);
+
+ if (topic != null)
+ serTopic = marsh.marshal(topic);
+ }
+
+ GridDeployment dep = null;
+
+ String depClsName = null;
+
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ Class<?> cls0 = U.detectClass(msg);
+
+ if (U.isJdk(cls0) && topic != null)
+ cls0 = U.detectClass(topic);
+
+ dep = ctx.deploy().deploy(cls0, U.detectClassLoader(cls0));
+
+ if (dep == null)
+ throw new IgniteDeploymentException("Failed to deploy user message: " + msg);
+
+ depClsName = cls0.getName();
+ }
+
+ GridTcpCommunicationMessageAdapter ioMsg = new GridIoUserMessage(
+ msg,
+ serMsg,
+ depClsName,
+ topic,
+ serTopic,
+ dep != null ? dep.classLoaderId() : null,
+ dep != null ? dep.deployMode() : null,
+ dep != null ? dep.userVersion() : null,
+ dep != null ? dep.participants() : null);
+
+ if (ordered)
+ sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
+ else if (loc)
+ send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ else {
+ ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
+
+ Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
+
+ if (locNode != null)
+ send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+
+ if (!rmtNodes.isEmpty())
+ send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ }
+ }
+
+ /**
+ * @param topic Topic to subscribe to.
+ * @param p Message predicate.
+ */
+ public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
+ if (p != null) {
+ try {
+ addMessageListener(TOPIC_COMM_USER,
+ new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * @param topic Topic to unsubscribe from.
+ * @param p Message predicate.
+ */
+ public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) {
+ try {
+ removeMessageListener(TOPIC_COMM_USER,
+ new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param topicOrd Topic ordinal value.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param ordered Ordered flag.
+ * @param timeout Message timeout.
+ * @param skipOnTimeout Whether message can be skipped in timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ private void send(
+ Collection<? extends ClusterNode> nodes,
+ Object topic,
+ int topicOrd,
+ GridTcpCommunicationMessageAdapter msg,
+ GridIoPolicy plc,
+ boolean ordered,
+ long timeout,
+ boolean skipOnTimeout
+ ) throws IgniteCheckedException {
+ assert nodes != null;
+ assert topic != null;
+ assert msg != null;
+ assert plc != null;
+
+ if (!ordered)
+ assert F.find(nodes, null, F.localNode(locNodeId)) == null :
+ "Internal GridGain code should never call the method with local node in a node list.";
+
+ try {
+ // Small optimization, as communication SPIs may have lighter implementation for sending
+ // messages to one node vs. many.
+ if (!nodes.isEmpty()) {
+ boolean first = true;
+
+ for (ClusterNode node : nodes) {
+ GridTcpCommunicationMessageAdapter msg0 = first ? msg : msg.clone();
+
+ first = false;
+
+ send(node, topic, topicOrd, msg0, plc, ordered, timeout, skipOnTimeout);
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
+ msg + ", policy=" + plc + ']');
+ }
+ catch (IgniteSpiException e) {
+ throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or " +
+ "TCP connection cannot be established due to firewall issues) " +
+ "[nodes=" + nodes + ", topic=" + topic +
+ ", msg=" + msg + ", policy=" + plc + ']', e);
+ }
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to add.
+ */
+ @SuppressWarnings({"TypeMayBeWeakened", "deprecation"})
+ public void addMessageListener(GridTopic topic, GridMessageListener lsnr) {
+ addMessageListener((Object)topic, lsnr);
+ }
+
+ /**
+ * @param lsnr Listener to add.
+ */
+ public void addDisconnectListener(GridDisconnectListener lsnr) {
+ disconnectLsnrs.add(lsnr);
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to add.
+ */
+ @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
+ public void addMessageListener(Object topic, final GridMessageListener lsnr) {
+ assert lsnr != null;
+ assert topic != null;
+
+ // Make sure that new topic is not in the list of closed topics.
+ closedTopics.remove(topic);
+
+ GridMessageListener lsnrs;
+
+ for (;;) {
+ lsnrs = lsnrMap.putIfAbsent(topic, lsnr);
+
+ if (lsnrs == null) {
+ lsnrs = lsnr;
+
+ break;
+ }
+
+ assert lsnrs != null;
+
+ if (!(lsnrs instanceof ArrayListener)) { // We are putting the second listener, creating array.
+ GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr);
+
+ if (lsnrMap.replace(topic, lsnrs, arrLsnr)) {
+ lsnrs = arrLsnr;
+
+ break;
+ }
+ }
+ else {
+ if (((ArrayListener)lsnrs).add(lsnr))
+ break;
+
+ // Add operation failed because array is already empty and is about to be removed, helping and retrying.
+ lsnrMap.remove(topic, lsnrs);
+ }
+ }
+
+ Map<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic);
+
+ Collection<GridCommunicationMessageSet> msgSets = map != null ? map.values() : null;
+
+ if (msgSets != null) {
+ final GridMessageListener lsnrs0 = lsnrs;
+
+ boolean success = true;
+
+ try {
+ for (final GridCommunicationMessageSet msgSet : msgSets) {
+ success = false;
+
+ workersCnt.increment();
+
+ pool(msgSet.policy()).execute(new GridWorker(ctx.gridName(), "msg-worker", log) {
+ @Override protected void body() {
+ try {
+ unwindMessageSet(msgSet, lsnrs0);
+ }
+ finally {
+ workersCnt.decrement();
+ }
+ }
+ });
+
+ success = true;
+ }
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process delayed message due to execution rejection. Increase the upper bound " +
+ "on executor service provided in 'GridConfiguration.getExecutorService()'). Will attempt to " +
+ "process message in the listener thread instead.", e);
+
+ for (GridCommunicationMessageSet msgSet : msgSets)
+ unwindMessageSet(msgSet, lsnr);
+ }
+ finally {
+ // Decrement for last runnable submission of which failed.
+ if (!success)
+ workersCnt.decrement();
+ }
+ }
+ }
+
+ /**
+ * @param topic Message topic.
+ * @return Whether or not listener was indeed removed.
+ */
+ public boolean removeMessageListener(GridTopic topic) {
+ return removeMessageListener((Object)topic);
+ }
+
+ /**
+ * @param topic Message topic.
+ * @return Whether or not listener was indeed removed.
+ */
+ public boolean removeMessageListener(Object topic) {
+ return removeMessageListener(topic, null);
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to remove.
+ * @return Whether or not the lsnr was removed.
+ */
+ @SuppressWarnings("deprecation")
+ public boolean removeMessageListener(GridTopic topic, @Nullable GridMessageListener lsnr) {
+ return removeMessageListener((Object)topic, lsnr);
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to remove.
+ * @return Whether or not the lsnr was removed.
+ */
+ @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
+ public boolean removeMessageListener(Object topic, @Nullable final GridMessageListener lsnr) {
+ assert topic != null;
+
+ boolean rmv = true;
+
+ Collection<GridCommunicationMessageSet> msgSets = null;
+
+ // If listener is null, then remove all listeners.
+ if (lsnr == null) {
+ closedTopics.add(topic);
+
+ rmv = lsnrMap.remove(topic) != null;
+
+ Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
+
+ if (map != null)
+ msgSets = map.values();
+ }
+ else {
+ for (;;) {
+ GridMessageListener lsnrs = lsnrMap.get(topic);
+
+ // If removing listener before subscription happened.
+ if (lsnrs == null) {
+ closedTopics.add(topic);
+
+ Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
+
+ if (map != null)
+ msgSets = map.values();
+
+ rmv = false;
+
+ break;
+ }
+ else {
+ boolean empty = false;
+
+ if (!(lsnrs instanceof ArrayListener)) {
+ if (lsnrs.equals(lsnr)) {
+ if (!lsnrMap.remove(topic, lsnrs))
+ continue; // Retry because it can be packed to array listener.
+
+ empty = true;
+ }
+ else
+ rmv = false;
+ }
+ else {
+ ArrayListener arrLsnr = (ArrayListener)lsnrs;
+
+ if (arrLsnr.remove(lsnr))
+ empty = arrLsnr.isEmpty();
+ else
+ // Listener was not found.
+ rmv = false;
+
+ if (empty)
+ lsnrMap.remove(topic, lsnrs);
+ }
+
+ // If removing last subscribed listener.
+ if (empty) {
+ closedTopics.add(topic);
+
+ Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
+
+ if (map != null)
+ msgSets = map.values();
+ }
+
+ break;
+ }
+ }
+ }
+
+ if (msgSets != null)
+ for (GridCommunicationMessageSet msgSet : msgSets)
+ ctx.timeout().removeTimeoutObject(msgSet);
+
+ if (rmv && log.isDebugEnabled())
+ log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
+
+ return rmv;
+ }
+
+ /**
+ * Gets sent messages count.
+ *
+ * @return Sent messages count.
+ */
+ public int getSentMessagesCount() {
+ return getSpi().getSentMessagesCount();
+ }
+
+ /**
+ * Gets sent bytes count.
+ *
+ * @return Sent bytes count.
+ */
+ public long getSentBytesCount() {
+ return getSpi().getSentBytesCount();
+ }
+
+ /**
+ * Gets received messages count.
+ *
+ * @return Received messages count.
+ */
+ public int getReceivedMessagesCount() {
+ return getSpi().getReceivedMessagesCount();
+ }
+
+ /**
+ * Gets received bytes count.
+ *
+ * @return Received bytes count.
+ */
+ public long getReceivedBytesCount() {
+ return getSpi().getReceivedBytesCount();
+ }
+
+ /**
+ * Gets outbound messages queue size.
+ *
+ * @return Outbound messages queue size.
+ */
+ public int getOutboundMessagesQueueSize() {
+ return getSpi().getOutboundMessagesQueueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ X.println(">>>");
+ X.println(">>> IO manager memory stats [grid=" + ctx.gridName() + ']');
+ X.println(">>> lsnrMapSize: " + lsnrMap.size());
+ X.println(">>> msgSetMapSize: " + msgSetMap.size());
+ X.println(">>> closedTopicsSize: " + closedTopics.sizex());
+ X.println(">>> discoWaitMapSize: " + waitMap.size());
+ }
+
+ /**
+ * Linked chain of listeners.
+ */
+ private static class ArrayListener implements GridMessageListener {
+ /** */
+ private volatile GridMessageListener[] arr;
+
+ /**
+ * @param arr Array of listeners.
+ */
+ ArrayListener(GridMessageListener... arr) {
+ this.arr = arr;
+ }
+
+ /**
+ * Passes message to the whole chain.
+ *
+ * @param nodeId Node ID.
+ * @param msg Message.
+ */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ GridMessageListener[] arr0 = arr;
+
+ if (arr0 == null)
+ return;
+
+ for (GridMessageListener l : arr0)
+ l.onMessage(nodeId, msg);
+ }
+
+ /**
+ * @return {@code true} If this instance is empty.
+ */
+ boolean isEmpty() {
+ return arr == null;
+ }
+
+ /**
+ * @param l Listener.
+ * @return {@code true} If listener was removed.
+ */
+ synchronized boolean remove(GridMessageListener l) {
+ GridMessageListener[] arr0 = arr;
+
+ if (arr0 == null)
+ return false;
+
+ if (arr0.length == 1) {
+ if (!arr0[0].equals(l))
+ return false;
+
+ arr = null;
+
+ return true;
+ }
+
+ for (int i = 0; i < arr0.length; i++) {
+ if (arr0[i].equals(l)) {
+ int newLen = arr0.length - 1;
+
+ if (i == newLen) // Remove last.
+ arr = Arrays.copyOf(arr0, newLen);
+ else {
+ GridMessageListener[] arr1 = new GridMessageListener[newLen];
+
+ if (i != 0) // Not remove first.
+ System.arraycopy(arr0, 0, arr1, 0, i);
+
+ System.arraycopy(arr0, i + 1, arr1, i, newLen - i);
+
+ arr = arr1;
+ }
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param l Listener.
+ * @return {@code true} if listener was added. Add can fail if this instance is empty and is about to be removed
+ * from map.
+ */
+ synchronized boolean add(GridMessageListener l) {
+ GridMessageListener[] arr0 = arr;
+
+ if (arr0 == null)
+ return false;
+
+ int oldLen = arr0.length;
+
+ arr0 = Arrays.copyOf(arr0, oldLen + 1);
+
+ arr0[oldLen] = l;
+
+ arr = arr0;
+
+ return true;
+ }
+ }
+
+ /**
+ * This class represents a message listener wrapper that knows about peer deployment.
+ */
+ private class GridUserMessageListener implements GridMessageListener {
+ /** Predicate listeners. */
+ private final IgniteBiPredicate<UUID, Object> predLsnr;
+
+ /** User message topic. */
+ private final Object topic;
+
+ /**
+ * @param topic User topic.
+ * @param predLsnr Predicate listener.
+ * @throws IgniteCheckedException If failed to inject resources to predicates.
+ */
+ GridUserMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr)
+ throws IgniteCheckedException {
+ this.topic = topic;
+ this.predLsnr = predLsnr;
+
+ if (predLsnr != null)
+ ctx.resource().injectGeneric(predLsnr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
+ "OverlyStrongTypeCast"})
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ if (!(msg instanceof GridIoUserMessage)) {
+ U.error(log, "Received unknown message (potentially fatal problem): " + msg);
+
+ return;
+ }
+
+ GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ U.warn(log, "Failed to resolve sender node (did the node left grid?): " + nodeId);
+
+ return;
+ }
+
+ Object msgBody = ioMsg.body();
+
+ assert msgBody != null || ioMsg.bodyBytes() != null;
+
+ try {
+ byte[] msgTopicBytes = ioMsg.topicBytes();
+
+ Object msgTopic = ioMsg.topic();
+
+ GridDeployment dep = ioMsg.deployment();
+
+ if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+ ioMsg.deploymentClassName() != null) {
+ dep = ctx.deploy().getGlobalDeployment(
+ ioMsg.deploymentMode(),
+ ioMsg.deploymentClassName(),
+ ioMsg.deploymentClassName(),
+ ioMsg.userVersion(),
+ nodeId,
+ ioMsg.classLoaderId(),
+ ioMsg.loaderParticipants(),
+ null);
+
+ if (dep == null)
+ throw new IgniteDeploymentException(
+ "Failed to obtain deployment information for user message. " +
+ "If you are using custom message or topic class, try implementing " +
+ "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+ ioMsg.deployment(dep); // Cache deployment.
+ }
+
+ // Unmarshall message topic if needed.
+ if (msgTopic == null && msgTopicBytes != null) {
+ msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+
+ ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+ }
+
+ if (!F.eq(topic, msgTopic))
+ return;
+
+ if (msgBody == null) {
+ msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+
+ ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+ }
+
+ // Resource injection.
+ if (dep != null)
+ ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+ msg + ']', e);
+ }
+
+ if (msgBody != null) {
+ if (predLsnr != null) {
+ if (!predLsnr.apply(nodeId, msgBody))
+ removeMessageListener(TOPIC_COMM_USER, this);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GridUserMessageListener l = (GridUserMessageListener)o;
+
+ return F.eq(predLsnr, l.predLsnr) && F.eq(topic, l.topic);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = predLsnr != null ? predLsnr.hashCode() : 0;
+
+ res = 31 * res + (topic != null ? topic.hashCode() : 0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridUserMessageListener.class, this);
+ }
+ }
+
+ /**
+ * Ordered communication message set.
+ */
+ private class GridCommunicationMessageSet implements GridTimeoutObject {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private long endTime;
+
+ /** */
+ private final IgniteUuid timeoutId;
+
+ /** */
+ @GridToStringInclude
+ private final Object topic;
+
+ /** */
+ private final GridIoPolicy plc;
+
+ /** */
+ @GridToStringInclude
+ private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new ConcurrentLinkedDeque<>();
+
+ /** */
+ private final AtomicBoolean reserved = new AtomicBoolean();
+
+ /** */
+ private final long timeout;
+
+ /** */
+ private final boolean skipOnTimeout;
+
+ /** */
+ private long lastTs;
+
+ /**
+ * @param plc Communication policy.
+ * @param topic Communication topic.
+ * @param nodeId Node ID.
+ * @param timeout Timeout.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @param msg Message to add immediately.
+ */
+ GridCommunicationMessageSet(
+ GridIoPolicy plc,
+ Object topic,
+ UUID nodeId,
+ long timeout,
+ boolean skipOnTimeout,
+ GridIoMessage msg
+ ) {
+ assert nodeId != null;
+ assert topic != null;
+ assert plc != null;
+ assert msg != null;
+
+ this.plc = plc;
+ this.nodeId = nodeId;
+ this.topic = topic;
+ this.timeout = timeout == 0 ? ctx.config().getNetworkTimeout() : timeout;
+ this.skipOnTimeout = skipOnTimeout;
+
+ endTime = endTime(timeout);
+
+ timeoutId = IgniteUuid.randomUuid();
+
+ lastTs = U.currentTimeMillis();
+
+ msgs.add(F.t(msg, lastTs));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return timeoutId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @Override public void onTimeout() {
+ GridMessageListener lsnr = lsnrMap.get(topic);
+
+ if (lsnr != null) {
+ long delta = 0;
+
+ if (skipOnTimeout) {
+ while (true) {
+ delta = 0;
+
+ boolean unwind = false;
+
+ synchronized (this) {
+ if (!msgs.isEmpty()) {
+ delta = U.currentTimeMillis() - lastTs;
+
+ if (delta >= timeout)
+ unwind = true;
+ }
+ }
+
+ if (unwind)
+ unwindMessageSet(this, lsnr);
+ else
+ break;
+ }
+ }
+
+ // Someone is still listening to messages, so delay set removal.
+ endTime = endTime(timeout - delta);
+
+ ctx.timeout().addTimeoutObject(this);
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Removing message set due to timeout: " + this);
+
+ ConcurrentMap<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic);
+
+ if (map != null) {
+ boolean rmv;
+
+ synchronized (map) {
+ rmv = map.remove(nodeId, this) && map.isEmpty();
+ }
+
+ if (rmv)
+ msgSetMap.remove(topic, map);
+ }
+ }
+
+ /**
+ * @return ID of node that sent the messages in the set.
+ */
+ UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Communication policy.
+ */
+ GridIoPolicy policy() {
+ return plc;
+ }
+
+ /**
+ * @return Message topic.
+ */
+ Object topic() {
+ return topic;
+ }
+
+ /**
+ * @return {@code True} if successful.
+ */
+ boolean reserve() {
+ return reserved.compareAndSet(false, true);
+ }
+
+ /**
+ * @return {@code True} if set is reserved.
+ */
+ boolean reserved() {
+ return reserved.get();
+ }
+
+ /**
+ * Releases reservation.
+ */
+ void release() {
+ assert reserved.get() : "Message set was not reserved: " + this;
+
+ reserved.set(false);
+ }
+
+ /**
+ * @param lsnr Listener to notify.
+ */
+ void unwind(GridMessageListener lsnr) {
+ assert reserved.get();
+
+ for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t != null; t = msgs.poll())
+ lsnr.onMessage(nodeId, t.get1().message());
+ }
+
+ /**
+ * @param msg Message to add.
+ */
+ void add(GridIoMessage msg) {
+ msgs.add(F.t(msg, U.currentTimeMillis()));
+ }
+
+ /**
+ * @return {@code True} if set has messages to unwind.
+ */
+ boolean changed() {
+ return !msgs.isEmpty();
+ }
+
+ /**
+ * Calculates end time with overflow check.
+ *
+ * @param timeout Timeout in milliseconds.
+ * @return End time in milliseconds.
+ */
+ private long endTime(long timeout) {
+ long endTime = U.currentTimeMillis() + timeout;
+
+ // Account for overflow.
+ if (endTime < 0)
+ endTime = Long.MAX_VALUE;
+
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCommunicationMessageSet.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ConcurrentHashMap0<K, V> extends ConcurrentHashMap8<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int hash;
+
+ /**
+ * @param o Object to be compared for equality with this map.
+ * @return {@code True} only for {@code this}.
+ */
+ @Override public boolean equals(Object o) {
+ return o == this;
+ }
+
+ /**
+ * @return Identity hash code.
+ */
+ @Override public int hashCode() {
+ if (hash == 0) {
+ int hash0 = System.identityHashCode(this);
+
+ hash = hash0 != 0 ? hash0 : -1;
+ }
+
+ return hash;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class DelayedMessage {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private final GridIoMessage msg;
+
+ /** */
+ private final IgniteRunnable msgC;
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param msgC Callback.
+ */
+ private DelayedMessage(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
+ this.nodeId = nodeId;
+ this.msg = msg;
+ this.msgC = msgC;
+ }
+
+ /**
+ * @return Message char.
+ */
+ public IgniteRunnable callback() {
+ return msgC;
+ }
+
+ /**
+ * @return Message.
+ */
+ public GridIoMessage message() {
+ return msg;
+ }
+
+ /**
+ * @return Node id.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DelayedMessage.class, this, super.toString());
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 0000000,e7067f2..fd67067
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@@ -1,0 -1,348 +1,344 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.managers.communication;
+
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.tostring.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Wrapper for all grid messages.
+ */
+ public class GridIoMessage extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Policy. */
+ private GridIoPolicy plc;
+
+ /** Message topic. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Object topic;
+
+ /** Topic bytes. */
+ private byte[] topicBytes;
+
+ /** Topic ordinal. */
+ private int topicOrd = -1;
+
+ /** Message ordered flag. */
+ private boolean ordered;
+
+ /** Message timeout. */
+ private long timeout;
+
+ /** Whether message can be skipped on timeout. */
+ private boolean skipOnTimeout;
+
+ /** Message. */
+ private GridTcpCommunicationMessageAdapter msg;
+
+ /**
+ * No-op constructor to support {@link Externalizable} interface.
+ * This constructor is not meant to be used for other purposes.
+ */
+ public GridIoMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param plc Policy.
+ * @param topic Communication topic.
+ * @param topicOrd Topic ordinal value.
+ * @param msg Message.
+ * @param ordered Message ordered flag.
+ * @param timeout Timeout.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ */
+ public GridIoMessage(
+ GridIoPolicy plc,
+ Object topic,
+ int topicOrd,
+ GridTcpCommunicationMessageAdapter msg,
+ boolean ordered,
+ long timeout,
+ boolean skipOnTimeout
+ ) {
+ assert plc != null;
+ assert topic != null;
+ assert topicOrd <= Byte.MAX_VALUE;
+ assert msg != null;
+
+ this.plc = plc;
+ this.msg = msg;
+ this.topic = topic;
+ this.topicOrd = topicOrd;
+ this.ordered = ordered;
+ this.timeout = timeout;
+ this.skipOnTimeout = skipOnTimeout;
+ }
+
+ /**
+ * @return Policy.
+ */
+ GridIoPolicy policy() {
+ return plc;
+ }
+
+ /**
+ * @return Topic.
+ */
+ Object topic() {
+ return topic;
+ }
+
+ /**
+ * @param topic Topic.
+ */
+ void topic(Object topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * @return Topic bytes.
+ */
+ byte[] topicBytes() {
+ return topicBytes;
+ }
+
+ /**
+ * @param topicBytes Topic bytes.
+ */
+ void topicBytes(byte[] topicBytes) {
+ this.topicBytes = topicBytes;
+ }
+
+ /**
+ * @return Topic ordinal.
+ */
+ int topicOrdinal() {
+ return topicOrd;
+ }
+
+ /**
+ * @return Message.
+ */
+ public Object message() {
+ return msg;
+ }
+
+ /**
+ * @return Message timeout.
+ */
+ public long timeout() {
+ return timeout;
+ }
+
+ /**
+ * @return Whether message can be skipped on timeout.
+ */
+ public boolean skipOnTimeout() {
+ return skipOnTimeout;
+ }
+
+ /**
+ * @return {@code True} if message is ordered, {@code false} otherwise.
+ */
+ boolean isOrdered() {
+ return ordered;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridIoMessage _clone = new GridIoMessage();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("RedundantCast")
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridIoMessage _clone = (GridIoMessage)_msg;
+
+ _clone.plc = plc;
+ _clone.topic = topic;
+ _clone.topicBytes = topicBytes;
+ _clone.topicOrd = topicOrd;
+ _clone.ordered = ordered;
+ _clone.timeout = timeout;
+ _clone.skipOnTimeout = skipOnTimeout;
+ _clone.msg = msg != null ? (GridTcpCommunicationMessageAdapter)msg.clone() : null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putMessage(msg))
++ if (!commState.putMessage("msg", msg))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putBoolean(ordered))
++ if (!commState.putBoolean("ordered", ordered))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putEnum(plc))
++ if (!commState.putEnum("plc", plc))
+ return false;
+
+ commState.idx++;
+
+ case 3:
- if (!commState.putBoolean(skipOnTimeout))
++ if (!commState.putBoolean("skipOnTimeout", skipOnTimeout))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putLong(timeout))
++ if (!commState.putLong("timeout", timeout))
+ return false;
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putByteArray(topicBytes))
++ if (!commState.putByteArray("topicBytes", topicBytes))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putInt(topicOrd))
++ if (!commState.putInt("topicOrd", topicOrd))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- Object msg0 = commState.getMessage();
++ msg = (GridTcpCommunicationMessageAdapter)commState.getMessage("msg");
+
- if (msg0 == MSG_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- msg = (GridTcpCommunicationMessageAdapter)msg0;
-
+ commState.idx++;
+
+ case 1:
- if (buf.remaining() < 1)
- return false;
++ ordered = commState.getBoolean("ordered");
+
- ordered = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 2:
- if (buf.remaining() < 1)
- return false;
++ byte plc0 = commState.getByte("plc");
+
- byte plc0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ plc = GridIoPolicy.fromOrdinal(plc0);
+
+ commState.idx++;
+
+ case 3:
- if (buf.remaining() < 1)
- return false;
++ skipOnTimeout = commState.getBoolean("skipOnTimeout");
+
- skipOnTimeout = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 4:
- if (buf.remaining() < 8)
- return false;
++ timeout = commState.getLong("timeout");
+
- timeout = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 5:
- byte[] topicBytes0 = commState.getByteArray();
++ topicBytes = commState.getByteArray("topicBytes");
+
- if (topicBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- topicBytes = topicBytes0;
-
+ commState.idx++;
+
+ case 6:
- if (buf.remaining() < 4)
- return false;
++ topicOrd = commState.getInt("topicOrd");
+
- topicOrd = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 8;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridIoMessage.class, this);
+ }
+ }