You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:21 UTC

[29/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
index 0000000,87230c2..a6d0907
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
@@@ -1,0 -1,555 +1,548 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.nio;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.net.*;
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.locks.*;
+ 
+ /**
+  * Grid client for NIO server.
+  */
+ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient {
+     /** Socket. */
+     private final Socket sock;
+ 
+     /** Output stream. */
+     private final UnsafeBufferedOutputStream out;
+ 
+     /** Minimum buffered message count. */
+     private final int minBufferedMsgCnt;
+ 
+     /** Communication buffer size ratio. */
+     private final double bufSizeRatio;
+ 
+     /** */
 -    private final GridNioMessageWriter msgWriter;
 -
 -    /** */
+     private final ByteBuffer writeBuf;
+ 
+     /**
+      * @param metricsLsnr Metrics listener.
 -     * @param msgWriter Message writer.
+      * @param addr Address.
+      * @param locHost Local address.
+      * @param connTimeout Connect timeout.
+      * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option.
+      * @param sockRcvBuf Socket receive buffer.
+      * @param sockSndBuf Socket send buffer.
+      * @param bufSize Buffer size (or {@code 0} to disable buffer).
+      * @param minBufferedMsgCnt Minimum buffered message count.
+      * @param bufSizeRatio Communication buffer size ratio.
+      * @throws IgniteCheckedException If failed.
+      */
+     public GridTcpCommunicationClient(
+         GridNioMetricsListener metricsLsnr,
 -        GridNioMessageWriter msgWriter,
+         InetSocketAddress addr,
+         InetAddress locHost,
+         long connTimeout,
+         boolean tcpNoDelay,
+         int sockRcvBuf,
+         int sockSndBuf,
+         int bufSize,
+         int minBufferedMsgCnt,
+         double bufSizeRatio
+     ) throws IgniteCheckedException {
+         super(metricsLsnr);
+ 
+         assert metricsLsnr != null;
 -        assert msgWriter != null;
+         assert addr != null;
+         assert locHost != null;
+         assert connTimeout >= 0;
+         assert bufSize >= 0;
+ 
+         A.ensure(minBufferedMsgCnt >= 0,
+             "Value of minBufferedMessageCount property cannot be less than zero.");
+         A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1,
+             "Value of bufSizeRatio property must be between 0 and 1 (exclusive).");
+ 
 -        this.msgWriter = msgWriter;
+         this.minBufferedMsgCnt = minBufferedMsgCnt;
+         this.bufSizeRatio = bufSizeRatio;
+ 
+         writeBuf = ByteBuffer.allocate(8 << 10);
+ 
+         writeBuf.order(ByteOrder.nativeOrder());
+ 
+         sock = new Socket();
+ 
+         boolean success = false;
+ 
+         try {
+             sock.bind(new InetSocketAddress(locHost, 0));
+ 
+             sock.setTcpNoDelay(tcpNoDelay);
+ 
+             if (sockRcvBuf > 0)
+                 sock.setReceiveBufferSize(sockRcvBuf);
+ 
+             if (sockSndBuf > 0)
+                 sock.setSendBufferSize(sockSndBuf);
+ 
+             sock.connect(addr, (int)connTimeout);
+ 
+             out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize);
+ 
+             success = true;
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to connect to remote host " +
+                 "[addr=" + addr + ", localHost=" + locHost + ']', e);
+         }
+         finally {
+             if (!success)
+                 U.closeQuiet(sock);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException {
+         try {
+             handshakeC.applyx(sock.getInputStream(), sock.getOutputStream());
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to access IO streams when executing handshake with remote node: " +
+                 sock.getRemoteSocketAddress(), e);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean close() {
+         boolean res = super.close();
+ 
+         if (res) {
+             U.closeQuiet(out);
+             U.closeQuiet(sock);
+         }
+ 
+         return res;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void forceClose() {
+         super.forceClose();
+ 
+         try {
+             out.flush();
+         }
+         catch (IOException ignored) {
+             // No-op.
+         }
+ 
+         // Do not call (directly or indirectly) out.close() here
+         // since it may cause a deadlock.
+         out.forceClose();
+ 
+         U.closeQuiet(sock);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException {
+         if (closed())
+             throw new IgniteCheckedException("Client was closed: " + this);
+ 
+         try {
+             out.write(data, 0, len);
+ 
+             metricsLsnr.onBytesSent(len);
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
+         }
+ 
+         markUsed();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+         throws IgniteCheckedException {
+         if (closed())
+             throw new IgniteCheckedException("Client was closed: " + this);
+ 
+         assert writeBuf.hasArray();
+ 
+         try {
 -            int cnt = msgWriter.writeFully(nodeId, msg, out, writeBuf);
++            int cnt = U.writeMessageFully(msg, out, writeBuf);
+ 
+             metricsLsnr.onBytesSent(cnt);
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
+         }
+ 
+         markUsed();
+ 
+         return false;
+     }
+ 
+     /**
+      * @param timeout Timeout.
+      * @throws IOException If failed.
+      */
+     @Override public void flushIfNeeded(long timeout) throws IOException {
+         assert timeout > 0;
+ 
+         out.flushOnTimeout(timeout);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
+         throw new UnsupportedOperationException();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridTcpCommunicationClient.class, this, super.toString());
+     }
+ 
+     /**
+      *
+      */
+     private class UnsafeBufferedOutputStream extends FilterOutputStream {
+         /** The internal buffer where data is stored. */
+         private final byte buf[];
+ 
+         /** Current size. */
+         private int size;
+ 
+         /** Count. */
+         private int cnt;
+ 
+         /** Message count. */
+         private int msgCnt;
+ 
+         /** Total messages size. */
+         private int totalCnt;
+ 
+         /** Lock. */
+         private final ReentrantLock lock = new ReentrantLock();
+ 
+         /** Last flushed timestamp. */
+         private volatile long lastFlushed = U.currentTimeMillis();
+ 
+         /** Cached flush timeout. */
+         private volatile long flushTimeout;
+ 
+         /** Buffer adjusted timestamp. */
+         private long lastAdjusted = U.currentTimeMillis();
+ 
+         /**
+          * Creates a new buffered output stream to write data to the
+          * specified underlying output stream.
+          *
+          * @param out The underlying output stream.
+          */
+         UnsafeBufferedOutputStream(OutputStream out) {
+             this(out, 8192);
+         }
+ 
+         /**
+          * Creates a new buffered output stream to write data to the
+          * specified underlying output stream with the specified buffer
+          * size.
+          *
+          * @param out The underlying output stream.
+          * @param size The buffer size.
+          */
+         UnsafeBufferedOutputStream(OutputStream out, int size) {
+             super(out);
+ 
+             assert size >= 0;
+ 
+             this.size = size;
+             buf = size > 0 ? new byte[size] : null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void write(int b) throws IOException {
+             throw new UnsupportedOperationException();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void write(byte[] b, int off, int len) throws IOException {
+             assert b != null;
+             assert off == 0;
+ 
+             // No buffering.
+             if (buf == null) {
+                 lock.lock();
+ 
+                 try {
+                     out.write(b, 0, len);
+                 }
+                 finally {
+                     lock.unlock();
+                 }
+ 
+                 return;
+             }
+ 
+             // Buffering is enabled.
+             lock.lock();
+ 
+             try {
+                 msgCnt++;
+                 totalCnt += len;
+ 
+                 if (len >= size) {
+                     flushLocked();
+ 
+                     out.write(b, 0, len);
+ 
+                     lastFlushed = U.currentTimeMillis();
+ 
+                     adjustBufferIfNeeded();
+ 
+                     return;
+                 }
+ 
+                 if (cnt + len > size) {
+                     flushLocked();
+ 
+                     messageToBuffer0(b, off, len, buf, 0);
+ 
+                     cnt = len;
+ 
+                     assert cnt < size;
+ 
+                     adjustBufferIfNeeded();
+ 
+                     return;
+                 }
+ 
+                 messageToBuffer0(b, 0, len, buf, cnt);
+ 
+                 cnt += len;
+ 
+                 if (cnt == size)
+                     flushLocked();
+                 else
+                     flushIfNeeded();
+             }
+             finally {
+                 lock.unlock();
+             }
+         }
+ 
+         /**
+          * @throws IOException If failed.
+          */
+         private void flushIfNeeded() throws IOException {
+             assert lock.isHeldByCurrentThread();
+             assert buf != null;
+ 
+             long flushTimeout0 = flushTimeout;
+ 
+             if (flushTimeout0 > 0)
+                 flushOnTimeoutLocked(flushTimeout0);
+         }
+ 
+         /**
+          *
+          */
+         private void adjustBufferIfNeeded() {
+             assert lock.isHeldByCurrentThread();
+             assert buf != null;
+ 
+             long flushTimeout0 = flushTimeout;
+ 
+             if (flushTimeout0 > 0)
+                 adjustBufferLocked(flushTimeout0);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void flush() throws IOException {
+             lock.lock();
+ 
+             try {
+                 flushLocked();
+             }
+             finally {
+                 lock.unlock();
+             }
+         }
+ 
+         /**
+          * @param timeout Timeout.
+          * @throws IOException If failed.
+          */
+         public void flushOnTimeout(long timeout) throws IOException {
+             assert buf != null;
+             assert timeout > 0;
+ 
+             // Overwrite cached value.
+             flushTimeout = timeout;
+ 
+             if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock())
+                 return;
+ 
+             try {
+                 flushOnTimeoutLocked(timeout);
+             }
+             finally {
+                 lock.unlock();
+             }
+         }
+ 
+         /**
+          * @param timeout Timeout.
+          * @throws IOException If failed.
+          */
+         private void flushOnTimeoutLocked(long timeout) throws IOException {
+             assert lock.isHeldByCurrentThread();
+             assert timeout > 0;
+ 
+             // Double check.
+             if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis())
+                 return;
+ 
+             flushLocked();
+ 
+             adjustBufferLocked(timeout);
+         }
+ 
+         /**
+          * @param timeout Timeout.
+          */
+         private void adjustBufferLocked(long timeout) {
+             assert lock.isHeldByCurrentThread();
+             assert timeout > 0;
+ 
+             long time = U.currentTimeMillis();
+ 
+             if (lastAdjusted + timeout < time) {
+                 if (msgCnt <= minBufferedMsgCnt)
+                     size = 0;
+                 else {
+                     size = (int)(totalCnt * bufSizeRatio);
+ 
+                     if (size > buf.length)
+                         size = buf.length;
+                 }
+ 
+                 msgCnt = 0;
+                 totalCnt = 0;
+ 
+                 lastAdjusted = time;
+             }
+         }
+ 
+         /**
+          * @throws IOException If failed.
+          */
+         private void flushLocked() throws IOException {
+             assert lock.isHeldByCurrentThread();
+ 
+             if (buf != null && cnt > 0) {
+                 out.write(buf, 0, cnt);
+ 
+                 cnt = 0;
+             }
+ 
+             out.flush();
+ 
+             lastFlushed = U.currentTimeMillis();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void close() throws IOException {
+             lock.lock();
+ 
+             try {
+                 flushLocked();
+             }
+             finally {
+                 try {
+                     out.close();
+                 }
+                 finally {
+                     lock.unlock();
+                 }
+             }
+         }
+ 
+         /**
+          * Forcibly closes underlying stream ignoring any possible exception.
+          */
+         public void forceClose() {
+             try {
+                 out.close();
+             }
+             catch (IOException ignored) {
+                 // No-op.
+             }
+         }
+ 
+         /**
+          * @param b Buffer to copy from.
+          * @param off Offset in source buffer.
+          * @param len Length.
+          * @param resBuf Result buffer.
+          * @param resOff Result offset.
+          */
+         private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff) {
+             assert b.length == len;
+             assert off == 0;
+             assert resBuf.length >= resOff + len + 4;
+ 
+             U.intToBytes(len, resBuf, resOff);
+ 
+             U.arrayCopy(b, off, resBuf, resOff + 4, len);
+         }
+ 
+         /**
+          * @param b Buffer to copy from (length included).
+          * @param off Offset in source buffer.
+          * @param len Length.
+          * @param resBuf Result buffer.
+          * @param resOff Result offset.
+          */
+         private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff) {
+             assert off == 0;
+             assert resBuf.length >= resOff + len;
+ 
+             U.arrayCopy(b, off, resBuf, resOff, len);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             lock.lock();
+ 
+             try {
+                 return S.toString(UnsafeBufferedOutputStream.class, this);
+             }
+             finally {
+                 lock.unlock();
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 8c9354f,aef2490..892c0de
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@@ -21,8 -28,9 +28,8 @@@ import org.apache.ignite.internal.manag
  import org.apache.ignite.plugin.security.*;
  import org.apache.ignite.spi.securesession.*;
  import org.apache.ignite.spi.swapspace.*;
- import org.gridgain.grid.util.typedef.*;
- import org.gridgain.grid.util.typedef.internal.*;
 -import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
  import org.jetbrains.annotations.*;
  
  import javax.management.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index e4c239a,da2152f..822dae7
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@@ -12,11 -20,11 +20,11 @@@ package org.apache.ignite.spi
  import org.apache.ignite.*;
  import org.apache.ignite.cluster.*;
  import org.apache.ignite.events.*;
 +import org.apache.ignite.plugin.extensions.communication.*;
  import org.apache.ignite.plugin.security.*;
  import org.apache.ignite.spi.swapspace.*;
- import org.gridgain.grid.kernal.managers.communication.*;
- import org.gridgain.grid.kernal.managers.eventstorage.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
 -import org.apache.ignite.internal.util.direct.*;
  import org.jetbrains.annotations.*;
  
  import java.io.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 8b204a4,afaae4f..3b0c98c
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@@ -13,24 -21,22 +21,22 @@@ import org.apache.ignite.*
  import org.apache.ignite.cluster.*;
  import org.apache.ignite.configuration.*;
  import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.util.*;
  import org.apache.ignite.lang.*;
- import org.apache.ignite.marshaller.*;
  import org.apache.ignite.resources.*;
  import org.apache.ignite.spi.*;
 +import org.apache.ignite.spi.communication.*;
  import org.apache.ignite.thread.*;
- import org.gridgain.grid.*;
- import org.gridgain.grid.kernal.managers.eventstorage.*;
- import org.gridgain.grid.util.*;
- import org.gridgain.grid.util.direct.*;
- import org.gridgain.grid.util.future.*;
- import org.gridgain.grid.util.ipc.*;
- import org.gridgain.grid.util.ipc.shmem.*;
- import org.gridgain.grid.util.lang.*;
- import org.gridgain.grid.util.nio.*;
- import org.gridgain.grid.util.typedef.*;
- import org.gridgain.grid.util.typedef.internal.*;
- import org.gridgain.grid.util.worker.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
 -import org.apache.ignite.spi.communication.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.ipc.*;
+ import org.apache.ignite.internal.util.ipc.shmem.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.nio.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
  import org.jdk8.backport.*;
  import org.jetbrains.annotations.*;
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 0000000,46a380f..217b6fb
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@@ -1,0 -1,173 +1,173 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.managers.communication;
+ 
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.spi.communication.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ 
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ import static java.util.concurrent.TimeUnit.*;
+ 
+ /**
+  * Send message test.
+  */
+ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest {
+     /** IP finder. */
+     private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ 
+     /** Sample count. */
+     private static final int SAMPLE_CNT = 1;
+ 
+     /** */
+     private static final byte DIRECT_TYPE = (byte)202;
+ 
+     /** */
+     private int bufSize;
+ 
+     static {
+         GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
+             @Override public GridTcpCommunicationMessageAdapter create(byte type) {
+                 return new TestMessage();
+             }
+         }, DIRECT_TYPE);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+         IgniteConfiguration c = super.getConfiguration(gridName);
+ 
+         TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ 
+         discoSpi.setIpFinder(ipFinder);
+ 
+         c.setDiscoverySpi(discoSpi);
+ 
+         TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+ 
+         commSpi.setConnectionBufferSize(bufSize);
+ 
+         c.setCommunicationSpi(commSpi);
+ 
+         return c;
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSendMessage() throws Exception {
+         try {
+             startGridsMultiThreaded(2);
+ 
+             doSend();
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSendMessageWithBuffer() throws Exception {
+         bufSize = 8192;
+ 
+         try {
+             startGridsMultiThreaded(2);
+ 
+             doSend();
+         }
+         finally {
+             stopAllGrids();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     private void doSend() throws Exception {
+         GridIoManager mgr0 = ((IgniteKernal)grid(0)).context().io();
+         GridIoManager mgr1 = ((IgniteKernal)grid(1)).context().io();
+ 
+         String topic = "test-topic";
+ 
+         final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
+ 
+         mgr1.addMessageListener(topic, new GridMessageListener() {
+             @Override public void onMessage(UUID nodeId, Object msg) {
+                 latch.countDown();
+             }
+         });
+ 
+         long time = System.nanoTime();
+ 
+         for (int i = 1; i <= SAMPLE_CNT; i++) {
+             mgr0.send(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
+ 
+             if (i % 500 == 0)
+                 info("Sent messages count: " + i);
+         }
+ 
+         assert latch.await(3, SECONDS);
+ 
+         time = System.nanoTime() - time;
+ 
+         info(">>>");
+         info(">>> send() time (ms): " + MILLISECONDS.convert(time, NANOSECONDS));
+         info(">>>");
+     }
+ 
+     /** */
+     private static class TestMessage extends GridTcpCommunicationMessageAdapter {
+         /** {@inheritDoc} */
+         @SuppressWarnings("CloneDoesntCallSuperClone")
+         @Override public GridTcpCommunicationMessageAdapter clone() {
+             throw new UnsupportedOperationException();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean writeTo(ByteBuffer buf) {
+             commState.setBuffer(buf);
+ 
 -            return commState.putByte(directType());
++            return commState.putByte(null, directType());
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean readFrom(ByteBuffer buf) {
+             return true;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public byte directType() {
+             return DIRECT_TYPE;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 0000000,d70e4b0..6b5003f
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@@ -1,0 -1,563 +1,564 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.testframework;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
++import org.apache.ignite.plugin.extensions.communication.*;
+ import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.spi.swapspace.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ 
+ /**
+  * Test SPI context.
+  */
+ public class GridSpiTestContext implements IgniteSpiContext {
+     /** */
+     private final Collection<ClusterNode> rmtNodes = new ConcurrentLinkedQueue<>();
+ 
+     /** */
+     private ClusterNode locNode;
+ 
+     /** */
+     private final Map<GridLocalEventListener, Set<Integer>> evtLsnrs = new HashMap<>();
+ 
+     /** */
+     @SuppressWarnings("deprecation")
+     private final Collection<GridMessageListener> msgLsnrs = new ArrayList<>();
+ 
+     /** */
+     private final Map<ClusterNode, Serializable> sentMsgs = new HashMap<>();
+ 
+     /** */
+     private final ConcurrentMap<String, Map> cache = new ConcurrentHashMap<>();
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<ClusterNode> remoteNodes() {
+         return rmtNodes;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public ClusterNode localNode() {
+         return locNode;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<ClusterNode> remoteDaemonNodes() {
+         Collection<ClusterNode> daemons = new ArrayList<>();
+ 
+         for (ClusterNode node : rmtNodes) {
+             if (node.isDaemon())
+                 daemons.add(node);
+         }
+ 
+         return daemons;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<ClusterNode> nodes() {
+         Collection<ClusterNode> all = new ArrayList<>(rmtNodes);
+ 
+         if (locNode != null)
+             all.add(locNode);
+ 
+         return all;
+     }
+ 
+     /**
+      * @param locNode Local node.
+      */
+     public void setLocalNode(@Nullable ClusterNode locNode) {
+         this.locNode = locNode;
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override
+     public ClusterNode node(UUID nodeId) {
+         if (locNode != null && locNode.id().equals(nodeId))
+             return locNode;
+ 
+         for (ClusterNode node : rmtNodes) {
+             if (node.id().equals(nodeId))
+                 return node;
+         }
+ 
+         return null;
+     }
+ 
+     /** */
+     public void createLocalNode() {
+         setLocalNode(new GridTestNode(UUID.randomUUID(), createMetrics(1, 1)));
+     }
+ 
+     /**
+      * @param cnt Number of nodes.
+      */
+     public void createRemoteNodes(int cnt) {
+         for (int i = 0; i < cnt; i++)
+             addNode(new GridTestNode(UUID.randomUUID(), createMetrics(1, 1)));
+     }
+ 
+     /** */
+     public void reset() {
+         setLocalNode(null);
+ 
+         rmtNodes.clear();
+     }
+ 
+     /**
+      * @param waitingJobs Waiting jobs count.
+      * @param activeJobs Active jobs count.
+      * @return Metrics adapter.
+      */
+     private ClusterMetricsSnapshot createMetrics(int waitingJobs, int activeJobs) {
+         ClusterMetricsSnapshot metrics = new ClusterMetricsSnapshot();
+ 
+         metrics.setCurrentWaitingJobs(waitingJobs);
+         metrics.setCurrentActiveJobs(activeJobs);
+ 
+         return metrics;
+     }
+ 
+     /**
+      * @param nodes Nodes to reset.
+      * @param rmv Whether nodes that were not passed in should be removed or not.
+      */
+     public void resetNodes(Collection<ClusterNode> nodes, boolean rmv) {
+         for (ClusterNode node : nodes) {
+             assert !node.equals(locNode);
+ 
+             if (!rmtNodes.contains(node))
+                 addNode(node);
+         }
+ 
+         if (rmv) {
+             for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext();) {
+                 ClusterNode node = iter.next();
+ 
+                 if (!nodes.contains(node)) {
+                     iter.remove();
+ 
+                     notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node));
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * @param node Node to check.
+      * @return {@code True} if the node is local.
+      */
+     public boolean isLocalNode(ClusterNode node) {
+         return locNode.equals(node);
+     }
+ 
+     /**
+      * @param node Node to add.
+      */
+     public void addNode(ClusterNode node) {
+         rmtNodes.add(node);
+ 
+         notifyListener(new IgniteDiscoveryEvent(locNode, "Node joined", EVT_NODE_JOINED, node));
+     }
+ 
+     /**
+      * @param node Node to remove.
+      */
+     public void removeNode(ClusterNode node) {
+         if (rmtNodes.remove(node))
+             notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node));
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      */
+     public void removeNode(UUID nodeId) {
+         for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext();) {
+             ClusterNode node = iter.next();
+ 
+             if (node.id().equals(nodeId)) {
+                 iter.remove();
+ 
+                 notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node));
+             }
+         }
+     }
+ 
+     /**
+      * @param node Node to fail.
+      */
+     public void failNode(ClusterNode node) {
+         if (rmtNodes.remove(node))
+             notifyListener(new IgniteDiscoveryEvent(locNode, "Node failed", EVT_NODE_FAILED, node));
+     }
+ 
+     /**
+      * @param node Node for metrics update.
+      */
+     public void updateMetrics(ClusterNode node) {
+         if (locNode.equals(node) || rmtNodes.contains(node))
+             notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated.", EVT_NODE_METRICS_UPDATED, node));
+     }
+ 
+     /** */
+     public void updateAllMetrics() {
+         notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated", EVT_NODE_METRICS_UPDATED, locNode));
+ 
+         for (ClusterNode node : rmtNodes) {
+             notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated", EVT_NODE_METRICS_UPDATED, node));
+         }
+     }
+ 
+     /**
+      * @param evt Event node.
+      */
+     private void notifyListener(IgniteEvent evt) {
+         assert evt.type() > 0;
+ 
+         for (Map.Entry<GridLocalEventListener, Set<Integer>> entry : evtLsnrs.entrySet()) {
+             if (F.isEmpty(entry.getValue()) || entry.getValue().contains(evt.type()))
+                 entry.getKey().onEvent(evt);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean pingNode(UUID nodeId) {
+         return node(nodeId) != null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void send(ClusterNode node, Serializable msg, String topic)
+         throws IgniteSpiException {
+         sentMsgs.put(node, msg);
+     }
+ 
+     /**
+      * @param node Node message was sent to.
+      * @return Sent message.
+      */
+     public Serializable getSentMessage(ClusterNode node) {
+         return sentMsgs.get(node);
+     }
+ 
+     /**
+      * @param node Node message was sent to.
+      * @return Sent message.
+      */
+     public Serializable removeSentMessage(ClusterNode node) {
+         return sentMsgs.remove(node);
+     }
+ 
+     /**
+      * @param node Destination node.
+      * @param msg Message.
+      */
+     @SuppressWarnings("deprecation")
+     public void triggerMessage(ClusterNode node, Object msg) {
+         for (GridMessageListener lsnr : msgLsnrs) {
+             lsnr.onMessage(node.id(), msg);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("deprecation")
+     @Override public void addMessageListener(GridMessageListener lsnr, String topic) {
+         msgLsnrs.add(lsnr);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("deprecation")
+     @Override public boolean removeMessageListener(GridMessageListener lsnr, String topic) {
+         return msgLsnrs.remove(lsnr);
+     }
+ 
+     /**
+      * @param type Event type.
+      * @param taskName Task name.
+      * @param taskSesId Session ID.
+      * @param msg Event message.
+      */
+     public void triggerTaskEvent(int type, String taskName, IgniteUuid taskSesId, String msg) {
+         assert type > 0;
+ 
+         triggerEvent(new IgniteTaskEvent(locNode, msg, type, taskSesId, taskName, null, false, null));
+     }
+ 
+     /**
+      * @param evt Event to trigger.
+      */
+     public void triggerEvent(IgniteEvent evt) {
+         notifyListener(evt);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void addLocalEventListener(GridLocalEventListener lsnr, int... types) {
+         Set<Integer> typeSet = F.addIfAbsent(evtLsnrs, lsnr, F.<Integer>newSet());
+ 
+         assert typeSet != null;
+ 
+         if (types != null) {
+             for (int type : types) {
+                 typeSet.add(type);
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean removeLocalEventListener(GridLocalEventListener lsnr) {
+         boolean res = evtLsnrs.containsKey(lsnr);
+ 
+         evtLsnrs.remove(lsnr);
+ 
+         return res;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isEventRecordable(int... types) {
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void recordEvent(IgniteEvent evt) {
+         notifyListener(evt);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void registerPort(int port, IgnitePortProtocol proto) {
+         /* No-op. */
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void deregisterPort(int port, IgnitePortProtocol proto) {
+         /* No-op. */
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void deregisterPorts() {
+         /* No-op. */
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <K, V> V get(String cacheName, K key) throws IgniteCheckedException {
+         assert cacheName != null;
+         assert key != null;
+ 
+         V res = null;
+ 
+         Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName);
+ 
+         CachedObject<V> obj = cache.get(key);
+ 
+         if (obj != null) {
+             if (obj.expire == 0 || obj.expire > System.currentTimeMillis())
+                 res = obj.obj;
+             else
+                 cache.remove(key);
+         }
+ 
+         return res;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <K, V> V put(String cacheName, K key, V val, long ttl) throws IgniteCheckedException {
+         assert cacheName != null;
+         assert key != null;
+         assert ttl >= 0;
+ 
+         long expire = ttl > 0 ? System.currentTimeMillis() + ttl : 0;
+ 
+         CachedObject<V> obj = new CachedObject<>(expire, val);
+ 
+         Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName);
+ 
+         CachedObject<V> prev = cache.put(key, obj);
+ 
+         return prev != null ? prev.obj : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"unchecked"})
+     @Override public <K, V> V putIfAbsent(String cacheName, K key, V val, long ttl) throws IgniteCheckedException {
+         V v = get(cacheName, key);
+ 
+         if (v != null)
+             return put(cacheName, key, val, ttl);
+ 
+         return v;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <K, V> V remove(String cacheName, K key) throws IgniteCheckedException {
+         assert cacheName != null;
+         assert key != null;
+ 
+         Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName);
+ 
+         CachedObject<V> prev = cache.remove(key);
+ 
+         return prev != null ? prev.obj : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <K> boolean containsKey(String cacheName, K key) {
+         assert cacheName != null;
+         assert key != null;
+ 
+         boolean res = false;
+ 
+         try {
+             res =  get(cacheName, key) != null;
+         }
+         catch (IgniteCheckedException ignored) {
+ 
+         }
+ 
+         return res;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
+         @Nullable ClassLoader ldr) throws IgniteCheckedException {
+         /* No-op. */
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr)
+         throws IgniteCheckedException {
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> T readFromOffheap(String spaceName, int part, Object key, byte[] keyBytes,
+         @Nullable ClassLoader ldr) throws IgniteCheckedException {
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean removeFromOffheap(@Nullable String spaceName, int part, Object key,
+         @Nullable byte[] keyBytes) throws IgniteCheckedException {
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void writeToOffheap(@Nullable String spaceName, int part, Object key, @Nullable byte[] keyBytes,
+         Object val, @Nullable byte[] valBytes, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+         // No-op.
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int partition(String cacheName, Object key) {
+         return -1;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void removeFromSwap(String spaceName, Object key,
+         @Nullable ClassLoader ldr) throws IgniteCheckedException {
+         // No-op.
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) {
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean writeDelta(UUID nodeId, Object msg, ByteBuffer buf) {
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean readDelta(UUID nodeId, Class<?> msgCls, ByteBuffer buf) {
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<GridSecuritySubject> authenticatedSubjects() throws IgniteCheckedException {
+         return Collections.emptyList();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridSecuritySubject authenticatedSubject(UUID subjId) throws IgniteCheckedException {
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
+         @Nullable ClassLoader ldr) throws IgniteCheckedException {
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
 -    @Override public GridTcpMessageFactory messageFactory() {
 -        return new GridTcpMessageFactory() {
++    @Override public MessageFactory messageFactory() {
++        return new MessageFactory() {
+             @Override public GridTcpCommunicationMessageAdapter create(byte type) {
+                 return GridTcpCommunicationMessageFactory.create(type);
+             }
+         };
+     }
+ 
+     /**
+      * @param cacheName Cache name.
+      * @return Map representing cache.
+      */
+     @SuppressWarnings("unchecked")
+     private <K, V> Map<K, V> getOrCreateCache(String cacheName) {
+         synchronized (cache) {
+             Map<K, V> map = cache.get(cacheName);
+ 
+             if (map == null)
+                 cache.put(cacheName, map = new ConcurrentHashMap<>());
+ 
+             return map;
+         }
+     }
+ 
+     /**
+      * Cached object.
+      */
+     private static class CachedObject<V> {
+         /** */
+         private long expire;
+ 
+         /** */
+         private V obj;
+ 
+         /**
+          * @param expire Expire time.
+          * @param obj Object.
+          */
+         private CachedObject(long expire, V obj) {
+             this.expire = expire;
+             this.obj = obj;
+         }
+     }
+ }