You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:21 UTC
[29/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
index 0000000,87230c2..a6d0907
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
@@@ -1,0 -1,555 +1,548 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.util.nio;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.net.*;
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.locks.*;
+
+ /**
+ * Grid client for NIO server.
+ */
+ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient {
+ /** Socket. */
+ private final Socket sock;
+
+ /** Output stream. */
+ private final UnsafeBufferedOutputStream out;
+
+ /** Minimum buffered message count. */
+ private final int minBufferedMsgCnt;
+
+ /** Communication buffer size ratio. */
+ private final double bufSizeRatio;
+
+ /** */
- private final GridNioMessageWriter msgWriter;
-
- /** */
+ private final ByteBuffer writeBuf;
+
+ /**
+ * @param metricsLsnr Metrics listener.
- * @param msgWriter Message writer.
+ * @param addr Address.
+ * @param locHost Local address.
+ * @param connTimeout Connect timeout.
+ * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option.
+ * @param sockRcvBuf Socket receive buffer.
+ * @param sockSndBuf Socket send buffer.
+ * @param bufSize Buffer size (or {@code 0} to disable buffer).
+ * @param minBufferedMsgCnt Minimum buffered message count.
+ * @param bufSizeRatio Communication buffer size ratio.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridTcpCommunicationClient(
+ GridNioMetricsListener metricsLsnr,
- GridNioMessageWriter msgWriter,
+ InetSocketAddress addr,
+ InetAddress locHost,
+ long connTimeout,
+ boolean tcpNoDelay,
+ int sockRcvBuf,
+ int sockSndBuf,
+ int bufSize,
+ int minBufferedMsgCnt,
+ double bufSizeRatio
+ ) throws IgniteCheckedException {
+ super(metricsLsnr);
+
+ assert metricsLsnr != null;
- assert msgWriter != null;
+ assert addr != null;
+ assert locHost != null;
+ assert connTimeout >= 0;
+ assert bufSize >= 0;
+
+ A.ensure(minBufferedMsgCnt >= 0,
+ "Value of minBufferedMessageCount property cannot be less than zero.");
+ A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1,
+ "Value of bufSizeRatio property must be between 0 and 1 (exclusive).");
+
- this.msgWriter = msgWriter;
+ this.minBufferedMsgCnt = minBufferedMsgCnt;
+ this.bufSizeRatio = bufSizeRatio;
+
+ writeBuf = ByteBuffer.allocate(8 << 10);
+
+ writeBuf.order(ByteOrder.nativeOrder());
+
+ sock = new Socket();
+
+ boolean success = false;
+
+ try {
+ sock.bind(new InetSocketAddress(locHost, 0));
+
+ sock.setTcpNoDelay(tcpNoDelay);
+
+ if (sockRcvBuf > 0)
+ sock.setReceiveBufferSize(sockRcvBuf);
+
+ if (sockSndBuf > 0)
+ sock.setSendBufferSize(sockSndBuf);
+
+ sock.connect(addr, (int)connTimeout);
+
+ out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize);
+
+ success = true;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to connect to remote host " +
+ "[addr=" + addr + ", localHost=" + locHost + ']', e);
+ }
+ finally {
+ if (!success)
+ U.closeQuiet(sock);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException {
+ try {
+ handshakeC.applyx(sock.getInputStream(), sock.getOutputStream());
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to access IO streams when executing handshake with remote node: " +
+ sock.getRemoteSocketAddress(), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean close() {
+ boolean res = super.close();
+
+ if (res) {
+ U.closeQuiet(out);
+ U.closeQuiet(sock);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void forceClose() {
+ super.forceClose();
+
+ try {
+ out.flush();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+
+ // Do not call (directly or indirectly) out.close() here
+ // since it may cause a deadlock.
+ out.forceClose();
+
+ U.closeQuiet(sock);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException {
+ if (closed())
+ throw new IgniteCheckedException("Client was closed: " + this);
+
+ try {
+ out.write(data, 0, len);
+
+ metricsLsnr.onBytesSent(len);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
+ }
+
+ markUsed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+ throws IgniteCheckedException {
+ if (closed())
+ throw new IgniteCheckedException("Client was closed: " + this);
+
+ assert writeBuf.hasArray();
+
+ try {
- int cnt = msgWriter.writeFully(nodeId, msg, out, writeBuf);
++ int cnt = U.writeMessageFully(msg, out, writeBuf);
+
+ metricsLsnr.onBytesSent(cnt);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
+ }
+
+ markUsed();
+
+ return false;
+ }
+
+ /**
+ * @param timeout Timeout.
+ * @throws IOException If failed.
+ */
+ @Override public void flushIfNeeded(long timeout) throws IOException {
+ assert timeout > 0;
+
+ out.flushOnTimeout(timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridTcpCommunicationClient.class, this, super.toString());
+ }
+
+ /**
+ *
+ */
+ private class UnsafeBufferedOutputStream extends FilterOutputStream {
+ /** The internal buffer where data is stored. */
+ private final byte buf[];
+
+ /** Current size. */
+ private int size;
+
+ /** Count. */
+ private int cnt;
+
+ /** Message count. */
+ private int msgCnt;
+
+ /** Total messages size. */
+ private int totalCnt;
+
+ /** Lock. */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /** Last flushed timestamp. */
+ private volatile long lastFlushed = U.currentTimeMillis();
+
+ /** Cached flush timeout. */
+ private volatile long flushTimeout;
+
+ /** Buffer adjusted timestamp. */
+ private long lastAdjusted = U.currentTimeMillis();
+
+ /**
+ * Creates a new buffered output stream to write data to the
+ * specified underlying output stream.
+ *
+ * @param out The underlying output stream.
+ */
+ UnsafeBufferedOutputStream(OutputStream out) {
+ this(out, 8192);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the
+ * specified underlying output stream with the specified buffer
+ * size.
+ *
+ * @param out The underlying output stream.
+ * @param size The buffer size.
+ */
+ UnsafeBufferedOutputStream(OutputStream out, int size) {
+ super(out);
+
+ assert size >= 0;
+
+ this.size = size;
+ buf = size > 0 ? new byte[size] : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int b) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] b, int off, int len) throws IOException {
+ assert b != null;
+ assert off == 0;
+
+ // No buffering.
+ if (buf == null) {
+ lock.lock();
+
+ try {
+ out.write(b, 0, len);
+ }
+ finally {
+ lock.unlock();
+ }
+
+ return;
+ }
+
+ // Buffering is enabled.
+ lock.lock();
+
+ try {
+ msgCnt++;
+ totalCnt += len;
+
+ if (len >= size) {
+ flushLocked();
+
+ out.write(b, 0, len);
+
+ lastFlushed = U.currentTimeMillis();
+
+ adjustBufferIfNeeded();
+
+ return;
+ }
+
+ if (cnt + len > size) {
+ flushLocked();
+
+ messageToBuffer0(b, off, len, buf, 0);
+
+ cnt = len;
+
+ assert cnt < size;
+
+ adjustBufferIfNeeded();
+
+ return;
+ }
+
+ messageToBuffer0(b, 0, len, buf, cnt);
+
+ cnt += len;
+
+ if (cnt == size)
+ flushLocked();
+ else
+ flushIfNeeded();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws IOException If failed.
+ */
+ private void flushIfNeeded() throws IOException {
+ assert lock.isHeldByCurrentThread();
+ assert buf != null;
+
+ long flushTimeout0 = flushTimeout;
+
+ if (flushTimeout0 > 0)
+ flushOnTimeoutLocked(flushTimeout0);
+ }
+
+ /**
+ *
+ */
+ private void adjustBufferIfNeeded() {
+ assert lock.isHeldByCurrentThread();
+ assert buf != null;
+
+ long flushTimeout0 = flushTimeout;
+
+ if (flushTimeout0 > 0)
+ adjustBufferLocked(flushTimeout0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush() throws IOException {
+ lock.lock();
+
+ try {
+ flushLocked();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param timeout Timeout.
+ * @throws IOException If failed.
+ */
+ public void flushOnTimeout(long timeout) throws IOException {
+ assert buf != null;
+ assert timeout > 0;
+
+ // Overwrite cached value.
+ flushTimeout = timeout;
+
+ if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock())
+ return;
+
+ try {
+ flushOnTimeoutLocked(timeout);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param timeout Timeout.
+ * @throws IOException If failed.
+ */
+ private void flushOnTimeoutLocked(long timeout) throws IOException {
+ assert lock.isHeldByCurrentThread();
+ assert timeout > 0;
+
+ // Double check.
+ if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis())
+ return;
+
+ flushLocked();
+
+ adjustBufferLocked(timeout);
+ }
+
+ /**
+ * @param timeout Timeout.
+ */
+ private void adjustBufferLocked(long timeout) {
+ assert lock.isHeldByCurrentThread();
+ assert timeout > 0;
+
+ long time = U.currentTimeMillis();
+
+ if (lastAdjusted + timeout < time) {
+ if (msgCnt <= minBufferedMsgCnt)
+ size = 0;
+ else {
+ size = (int)(totalCnt * bufSizeRatio);
+
+ if (size > buf.length)
+ size = buf.length;
+ }
+
+ msgCnt = 0;
+ totalCnt = 0;
+
+ lastAdjusted = time;
+ }
+ }
+
+ /**
+ * @throws IOException If failed.
+ */
+ private void flushLocked() throws IOException {
+ assert lock.isHeldByCurrentThread();
+
+ if (buf != null && cnt > 0) {
+ out.write(buf, 0, cnt);
+
+ cnt = 0;
+ }
+
+ out.flush();
+
+ lastFlushed = U.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ lock.lock();
+
+ try {
+ flushLocked();
+ }
+ finally {
+ try {
+ out.close();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Forcibly closes underlying stream ignoring any possible exception.
+ */
+ public void forceClose() {
+ try {
+ out.close();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+ }
+
+ /**
+ * @param b Buffer to copy from.
+ * @param off Offset in source buffer.
+ * @param len Length.
+ * @param resBuf Result buffer.
+ * @param resOff Result offset.
+ */
+ private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff) {
+ assert b.length == len;
+ assert off == 0;
+ assert resBuf.length >= resOff + len + 4;
+
+ U.intToBytes(len, resBuf, resOff);
+
+ U.arrayCopy(b, off, resBuf, resOff + 4, len);
+ }
+
+ /**
+ * @param b Buffer to copy from (length included).
+ * @param off Offset in source buffer.
+ * @param len Length.
+ * @param resBuf Result buffer.
+ * @param resOff Result offset.
+ */
+ private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff) {
+ assert off == 0;
+ assert resBuf.length >= resOff + len;
+
+ U.arrayCopy(b, off, resBuf, resOff, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ lock.lock();
+
+ try {
+ return S.toString(UnsafeBufferedOutputStream.class, this);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 8c9354f,aef2490..892c0de
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@@ -21,8 -28,9 +28,8 @@@ import org.apache.ignite.internal.manag
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.spi.securesession.*;
import org.apache.ignite.spi.swapspace.*;
- import org.gridgain.grid.util.typedef.*;
- import org.gridgain.grid.util.typedef.internal.*;
-import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import javax.management.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index e4c239a,da2152f..822dae7
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@@ -12,11 -20,11 +20,11 @@@ package org.apache.ignite.spi
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
+import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.spi.swapspace.*;
- import org.gridgain.grid.kernal.managers.communication.*;
- import org.gridgain.grid.kernal.managers.eventstorage.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.util.direct.*;
import org.jetbrains.annotations.*;
import java.io.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 8b204a4,afaae4f..3b0c98c
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@@ -13,24 -21,22 +21,22 @@@ import org.apache.ignite.*
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.util.*;
import org.apache.ignite.lang.*;
- import org.apache.ignite.marshaller.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.*;
import org.apache.ignite.thread.*;
- import org.gridgain.grid.*;
- import org.gridgain.grid.kernal.managers.eventstorage.*;
- import org.gridgain.grid.util.*;
- import org.gridgain.grid.util.direct.*;
- import org.gridgain.grid.util.future.*;
- import org.gridgain.grid.util.ipc.*;
- import org.gridgain.grid.util.ipc.shmem.*;
- import org.gridgain.grid.util.lang.*;
- import org.gridgain.grid.util.nio.*;
- import org.gridgain.grid.util.typedef.*;
- import org.gridgain.grid.util.typedef.internal.*;
- import org.gridgain.grid.util.worker.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.spi.communication.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.ipc.*;
+ import org.apache.ignite.internal.util.ipc.shmem.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.nio.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 0000000,46a380f..217b6fb
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@@ -1,0 -1,173 +1,173 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.managers.communication;
+
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.spi.communication.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.testframework.junits.common.*;
+
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+
+ import static java.util.concurrent.TimeUnit.*;
+
+ /**
+ * Send message test.
+ */
+ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Sample count. */
+ private static final int SAMPLE_CNT = 1;
+
+ /** */
+ private static final byte DIRECT_TYPE = (byte)202;
+
+ /** */
+ private int bufSize;
+
+ static {
+ GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
+ @Override public GridTcpCommunicationMessageAdapter create(byte type) {
+ return new TestMessage();
+ }
+ }, DIRECT_TYPE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(ipFinder);
+
+ c.setDiscoverySpi(discoSpi);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setConnectionBufferSize(bufSize);
+
+ c.setCommunicationSpi(commSpi);
+
+ return c;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSendMessage() throws Exception {
+ try {
+ startGridsMultiThreaded(2);
+
+ doSend();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSendMessageWithBuffer() throws Exception {
+ bufSize = 8192;
+
+ try {
+ startGridsMultiThreaded(2);
+
+ doSend();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doSend() throws Exception {
+ GridIoManager mgr0 = ((IgniteKernal)grid(0)).context().io();
+ GridIoManager mgr1 = ((IgniteKernal)grid(1)).context().io();
+
+ String topic = "test-topic";
+
+ final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
+
+ mgr1.addMessageListener(topic, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ latch.countDown();
+ }
+ });
+
+ long time = System.nanoTime();
+
+ for (int i = 1; i <= SAMPLE_CNT; i++) {
+ mgr0.send(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
+
+ if (i % 500 == 0)
+ info("Sent messages count: " + i);
+ }
+
+ assert latch.await(3, SECONDS);
+
+ time = System.nanoTime() - time;
+
+ info(">>>");
+ info(">>> send() time (ms): " + MILLISECONDS.convert(time, NANOSECONDS));
+ info(">>>");
+ }
+
+ /** */
+ private static class TestMessage extends GridTcpCommunicationMessageAdapter {
+ /** {@inheritDoc} */
+ @SuppressWarnings("CloneDoesntCallSuperClone")
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
- return commState.putByte(directType());
++ return commState.putByte(null, directType());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return DIRECT_TYPE;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 0000000,d70e4b0..6b5003f
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@@ -1,0 -1,563 +1,564 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.testframework;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
++import org.apache.ignite.plugin.extensions.communication.*;
+ import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.spi.swapspace.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+
+ import static org.apache.ignite.events.IgniteEventType.*;
+
+ /**
+ * Test SPI context.
+ */
+ public class GridSpiTestContext implements IgniteSpiContext {
+ /** */
+ private final Collection<ClusterNode> rmtNodes = new ConcurrentLinkedQueue<>();
+
+ /** */
+ private ClusterNode locNode;
+
+ /** */
+ private final Map<GridLocalEventListener, Set<Integer>> evtLsnrs = new HashMap<>();
+
+ /** */
+ @SuppressWarnings("deprecation")
+ private final Collection<GridMessageListener> msgLsnrs = new ArrayList<>();
+
+ /** */
+ private final Map<ClusterNode, Serializable> sentMsgs = new HashMap<>();
+
+ /** */
+ private final ConcurrentMap<String, Map> cache = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> remoteNodes() {
+ return rmtNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode localNode() {
+ return locNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> remoteDaemonNodes() {
+ Collection<ClusterNode> daemons = new ArrayList<>();
+
+ for (ClusterNode node : rmtNodes) {
+ if (node.isDaemon())
+ daemons.add(node);
+ }
+
+ return daemons;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> nodes() {
+ Collection<ClusterNode> all = new ArrayList<>(rmtNodes);
+
+ if (locNode != null)
+ all.add(locNode);
+
+ return all;
+ }
+
+ /**
+ * @param locNode Local node.
+ */
+ public void setLocalNode(@Nullable ClusterNode locNode) {
+ this.locNode = locNode;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override
+ public ClusterNode node(UUID nodeId) {
+ if (locNode != null && locNode.id().equals(nodeId))
+ return locNode;
+
+ for (ClusterNode node : rmtNodes) {
+ if (node.id().equals(nodeId))
+ return node;
+ }
+
+ return null;
+ }
+
+ /** */
+ public void createLocalNode() {
+ setLocalNode(new GridTestNode(UUID.randomUUID(), createMetrics(1, 1)));
+ }
+
+ /**
+ * @param cnt Number of nodes.
+ */
+ public void createRemoteNodes(int cnt) {
+ for (int i = 0; i < cnt; i++)
+ addNode(new GridTestNode(UUID.randomUUID(), createMetrics(1, 1)));
+ }
+
+ /** */
+ public void reset() {
+ setLocalNode(null);
+
+ rmtNodes.clear();
+ }
+
+ /**
+ * @param waitingJobs Waiting jobs count.
+ * @param activeJobs Active jobs count.
+ * @return Metrics adapter.
+ */
+ private ClusterMetricsSnapshot createMetrics(int waitingJobs, int activeJobs) {
+ ClusterMetricsSnapshot metrics = new ClusterMetricsSnapshot();
+
+ metrics.setCurrentWaitingJobs(waitingJobs);
+ metrics.setCurrentActiveJobs(activeJobs);
+
+ return metrics;
+ }
+
+ /**
+ * @param nodes Nodes to reset.
+ * @param rmv Whether nodes that were not passed in should be removed or not.
+ */
+ public void resetNodes(Collection<ClusterNode> nodes, boolean rmv) {
+ for (ClusterNode node : nodes) {
+ assert !node.equals(locNode);
+
+ if (!rmtNodes.contains(node))
+ addNode(node);
+ }
+
+ if (rmv) {
+ for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext();) {
+ ClusterNode node = iter.next();
+
+ if (!nodes.contains(node)) {
+ iter.remove();
+
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param node Node to check.
+ * @return {@code True} if the node is local.
+ */
+ public boolean isLocalNode(ClusterNode node) {
+ return locNode.equals(node);
+ }
+
+ /**
+ * @param node Node to add.
+ */
+ public void addNode(ClusterNode node) {
+ rmtNodes.add(node);
+
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Node joined", EVT_NODE_JOINED, node));
+ }
+
+ /**
+ * @param node Node to remove.
+ */
+ public void removeNode(ClusterNode node) {
+ if (rmtNodes.remove(node))
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node));
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ public void removeNode(UUID nodeId) {
+ for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext();) {
+ ClusterNode node = iter.next();
+
+ if (node.id().equals(nodeId)) {
+ iter.remove();
+
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node));
+ }
+ }
+ }
+
+ /**
+ * @param node Node to fail.
+ */
+ public void failNode(ClusterNode node) {
+ if (rmtNodes.remove(node))
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Node failed", EVT_NODE_FAILED, node));
+ }
+
+ /**
+ * @param node Node for metrics update.
+ */
+ public void updateMetrics(ClusterNode node) {
+ if (locNode.equals(node) || rmtNodes.contains(node))
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated.", EVT_NODE_METRICS_UPDATED, node));
+ }
+
+ /** */
+ public void updateAllMetrics() {
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated", EVT_NODE_METRICS_UPDATED, locNode));
+
+ for (ClusterNode node : rmtNodes) {
+ notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated", EVT_NODE_METRICS_UPDATED, node));
+ }
+ }
+
+ /**
+ * @param evt Event node.
+ */
+ private void notifyListener(IgniteEvent evt) {
+ assert evt.type() > 0;
+
+ for (Map.Entry<GridLocalEventListener, Set<Integer>> entry : evtLsnrs.entrySet()) {
+ if (F.isEmpty(entry.getValue()) || entry.getValue().contains(evt.type()))
+ entry.getKey().onEvent(evt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ return node(nodeId) != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void send(ClusterNode node, Serializable msg, String topic)
+ throws IgniteSpiException {
+ sentMsgs.put(node, msg);
+ }
+
+ /**
+ * @param node Node message was sent to.
+ * @return Sent message.
+ */
+ public Serializable getSentMessage(ClusterNode node) {
+ return sentMsgs.get(node);
+ }
+
+ /**
+ * @param node Node message was sent to.
+ * @return Sent message.
+ */
+ public Serializable removeSentMessage(ClusterNode node) {
+ return sentMsgs.remove(node);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param msg Message.
+ */
+ @SuppressWarnings("deprecation")
+ public void triggerMessage(ClusterNode node, Object msg) {
+ for (GridMessageListener lsnr : msgLsnrs) {
+ lsnr.onMessage(node.id(), msg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public void addMessageListener(GridMessageListener lsnr, String topic) {
+ msgLsnrs.add(lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public boolean removeMessageListener(GridMessageListener lsnr, String topic) {
+ return msgLsnrs.remove(lsnr);
+ }
+
+ /**
+ * @param type Event type.
+ * @param taskName Task name.
+ * @param taskSesId Session ID.
+ * @param msg Event message.
+ */
+ public void triggerTaskEvent(int type, String taskName, IgniteUuid taskSesId, String msg) {
+ assert type > 0;
+
+ triggerEvent(new IgniteTaskEvent(locNode, msg, type, taskSesId, taskName, null, false, null));
+ }
+
+ /**
+ * @param evt Event to trigger.
+ */
+ public void triggerEvent(IgniteEvent evt) {
+ notifyListener(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addLocalEventListener(GridLocalEventListener lsnr, int... types) {
+ Set<Integer> typeSet = F.addIfAbsent(evtLsnrs, lsnr, F.<Integer>newSet());
+
+ assert typeSet != null;
+
+ if (types != null) {
+ for (int type : types) {
+ typeSet.add(type);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeLocalEventListener(GridLocalEventListener lsnr) {
+ boolean res = evtLsnrs.containsKey(lsnr);
+
+ evtLsnrs.remove(lsnr);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEventRecordable(int... types) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void recordEvent(IgniteEvent evt) {
+ notifyListener(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerPort(int port, IgnitePortProtocol proto) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deregisterPort(int port, IgnitePortProtocol proto) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deregisterPorts() {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> V get(String cacheName, K key) throws IgniteCheckedException {
+ assert cacheName != null;
+ assert key != null;
+
+ V res = null;
+
+ Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName);
+
+ CachedObject<V> obj = cache.get(key);
+
+ if (obj != null) {
+ if (obj.expire == 0 || obj.expire > System.currentTimeMillis())
+ res = obj.obj;
+ else
+ cache.remove(key);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> V put(String cacheName, K key, V val, long ttl) throws IgniteCheckedException {
+ assert cacheName != null;
+ assert key != null;
+ assert ttl >= 0;
+
+ long expire = ttl > 0 ? System.currentTimeMillis() + ttl : 0;
+
+ CachedObject<V> obj = new CachedObject<>(expire, val);
+
+ Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName);
+
+ CachedObject<V> prev = cache.put(key, obj);
+
+ return prev != null ? prev.obj : null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked"})
+ @Override public <K, V> V putIfAbsent(String cacheName, K key, V val, long ttl) throws IgniteCheckedException {
+ V v = get(cacheName, key);
+
+ if (v != null)
+ return put(cacheName, key, val, ttl);
+
+ return v;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> V remove(String cacheName, K key) throws IgniteCheckedException {
+ assert cacheName != null;
+ assert key != null;
+
+ Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName);
+
+ CachedObject<V> prev = cache.remove(key);
+
+ return prev != null ? prev.obj : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K> boolean containsKey(String cacheName, K key) {
+ assert cacheName != null;
+ assert key != null;
+
+ boolean res = false;
+
+ try {
+ res = get(cacheName, key) != null;
+ }
+ catch (IgniteCheckedException ignored) {
+
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
+ @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr)
+ throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T readFromOffheap(String spaceName, int part, Object key, byte[] keyBytes,
+ @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeFromOffheap(@Nullable String spaceName, int part, Object key,
+ @Nullable byte[] keyBytes) throws IgniteCheckedException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeToOffheap(@Nullable String spaceName, int part, Object key, @Nullable byte[] keyBytes,
+ Object val, @Nullable byte[] valBytes, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(String cacheName, Object key) {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeFromSwap(String spaceName, Object key,
+ @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeDelta(UUID nodeId, Object msg, ByteBuffer buf) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readDelta(UUID nodeId, Class<?> msgCls, ByteBuffer buf) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridSecuritySubject> authenticatedSubjects() throws IgniteCheckedException {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridSecuritySubject authenticatedSubject(UUID subjId) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
+ @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
- @Override public GridTcpMessageFactory messageFactory() {
- return new GridTcpMessageFactory() {
++ @Override public MessageFactory messageFactory() {
++ return new MessageFactory() {
+ @Override public GridTcpCommunicationMessageAdapter create(byte type) {
+ return GridTcpCommunicationMessageFactory.create(type);
+ }
+ };
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Map representing cache.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> Map<K, V> getOrCreateCache(String cacheName) {
+ synchronized (cache) {
+ Map<K, V> map = cache.get(cacheName);
+
+ if (map == null)
+ cache.put(cacheName, map = new ConcurrentHashMap<>());
+
+ return map;
+ }
+ }
+
+ /**
+ * Cached object.
+ */
+ private static class CachedObject<V> {
+ /** */
+ private long expire;
+
+ /** */
+ private V obj;
+
+ /**
+ * @param expire Expire time.
+ * @param obj Object.
+ */
+ private CachedObject(long expire, V obj) {
+ this.expire = expire;
+ this.obj = obj;
+ }
+ }
+ }