You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:30 UTC
[07/50] [abbrv] ignite git commit: IGNITE-3054 - Rework message
handling.
IGNITE-3054 - Rework message handling.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/661efc21
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/661efc21
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/661efc21
Branch: refs/heads/ignite-3054
Commit: 661efc218038bef89c0ded7ec0c48a6631df8110
Parents: 2c8ee96
Author: dkarachentsev <dk...@gridgain.com>
Authored: Thu Nov 17 13:21:08 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Thu Nov 17 13:21:08 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 459 +++++--------------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 66 ++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 33 +-
3 files changed, 203 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/661efc21/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2a471c4..734e538 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -90,11 +90,15 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
+import org.apache.ignite.internal.util.nio.GridBufferedParser;
+import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
+import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
@@ -155,7 +159,6 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -204,12 +207,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Node ID in GridNioSession. */
private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
- /** Not fully read message in GridNioSession. */
- private static final int INCOMPLETE_MESSAGE_META = GridNioSessionMetaKey.nextUniqueKey();
-
- /** Not fully read message length. */
- private static final int MESSAGE_LEN_META = GridNioSessionMetaKey.nextUniqueKey();
-
/**
* Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
* <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
@@ -286,11 +283,70 @@ class ServerImpl extends TcpDiscoveryImpl {
new ConcurrentHashMap8<>();
/** Nio server that serves client connections. */
- private GridNioServer clientNioSrv;
+ private GridNioServer<byte[]> clientNioSrv;
/** List of nio workers. */
private Set<ClientNioMessageWorker> nioWorkers = new CopyOnWriteArraySet<>();
+ /** Listener of client nio connections. */
+ private final GridNioServerListener<byte[]> clientLsnr = new GridNioServerListenerAdapter<byte[]>() {
+ /** */
+ private final ClientNioMessageProcessor msgProc = new ClientNioMessageProcessor(log);
+
+ @Override public void onConnected(final GridNioSession ses) {
+ // No-op.
+ }
+
+ @Override public void onDisconnected(final GridNioSession ses, @Nullable final Exception e) {
+ final UUID clientNodeId = msgProc.clientNodeId(ses);
+
+ if (log.isDebugEnabled())
+ log.debug("Stopping message worker on disconnect [remoteAddr=" + ses.remoteAddress() +
+ ", remote node ID=" + clientNodeId + ']');
+
+ final ClientMessageProcessor proc = clientMsgWorkers.get(clientNodeId);
+
+ if (proc != null && proc instanceof ClientNioMessageWorker && ((ClientNioMessageWorker)proc).ses == ses) {
+ if (clientMsgWorkers.remove(clientNodeId, proc))
+ ((ClientNioMessageWorker)proc).nonblockingStop();
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.error("Illegal ClientMessageProcessor: " + proc);
+
+ if (ses.closeTime() == 0)
+ ses.close();
+ }
+ }
+
+ @Override public void onMessage(final GridNioSession ses, final byte[] msg) {
+ // Release nio thread.
+ nioClientProcessingPool.submit(new Runnable() {
+ @Override public void run() {
+ try {
+ msgProc.processMessage(ses, msg);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failure processing message, closing connection. [ses=" + ses + ']', e);
+
+ final UUID nodeId = ses.meta(NODE_ID_META);
+
+ assert nodeId != null;
+
+ final ClientMessageProcessor proc = clientMsgWorkers.get(nodeId);
+
+ if (proc != null && proc instanceof ClientNioMessageWorker) {
+ ClientNioMessageWorker wrk = (ClientNioMessageWorker)proc;
+
+ if (wrk.ses == ses)
+ clientMsgWorkers.remove(nodeId, wrk);
+ }
+ }
+ }
+ });
+ }
+ };
+
/**
* @param adapter Adapter.
*/
@@ -419,19 +475,36 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @return New NIO server.
*/
- private GridNioServer createClientNioServer() {
- final GridNioServer srv;
+ private GridNioServer<byte[]> createClientNioServer() {
+ final GridNioServer<byte[]> srv;
final ArrayList<GridNioFilter> filters = new ArrayList<>();
- filters.add(new ClientNioFilter("marshallerFilter"));
+ filters.add(new GridNioCodecFilter(new GridBufferedParser(
+ spi.isClientDirectBuffer(), spi.getClientByteOrder()) {
+ @Override public ByteBuffer encode(final GridNioSession ses,
+ final Object msg) throws IOException, IgniteCheckedException {
+ // Respond to client without message length.
+ byte[] msg0 = (byte[])msg;
+
+ ByteBuffer res = ByteBuffer.allocateDirect(msg0.length);
+
+ res.put(msg0);
+
+ res.flip();
+
+ return res;
+ }
+ }, log, false));
+
+ filters.add(new GridConnectionBytesVerifyFilter(log));
if (spi.isSslEnabled()) {
if (spi.sslCtx != null) {
GridNioSslFilter sslFilter = new GridNioSslFilter(
spi.sslCtx,
- true,
- ByteOrder.nativeOrder(),
+ spi.isClientDirectBuffer(),
+ spi.getClientByteOrder(),
log
);
@@ -448,21 +521,21 @@ class ServerImpl extends TcpDiscoveryImpl {
if (Thread.currentThread() instanceof IgniteThread)
gridName = ((IgniteThread)Thread.currentThread()).getGridName();
- srv = GridNioServer.builder().address(U.getLocalHost())
+ srv = GridNioServer.<byte[]>builder().address(U.getLocalHost())
.port(-1)
- .listener(new ClientNioListener<>(log, writeTimeout))
+ .listener(clientLsnr)
.filters(filters.toArray(new GridNioFilter[filters.size()]))
.logger(log)
.selectorCount(spi.getClientNioThreads())
.sendQueueLimit(spi.getClientSendMessageQueueLimit())
- .byteOrder(ByteOrder.nativeOrder())
+ .byteOrder(spi.getClientByteOrder())
.tcpNoDelay(spi.getTcpNodelay())
- .directBuffer(true)
+ .directBuffer(spi.isClientDirectBuffer())
.directMode(false)
.socketReceiveBufferSize(0)
.socketSendBufferSize(0)
.idleTimeout(Long.MAX_VALUE)
- .gridName("nio-" + gridName)
+ .gridName(gridName)
.daemon(false)
.writeTimeout(writeTimeout)
.build();
@@ -5662,7 +5735,18 @@ class ServerImpl extends TcpDiscoveryImpl {
* @return Send future.
*/
GridNioFuture<?> addReceipt(final int receipt) {
- return spi.sendMessage(ses, null, new byte[]{(byte) receipt});
+ try {
+ return spi.sendMessage(ses, null, new byte[]{(byte) receipt});
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed marshal message, closing connection. [receipt=" + receipt + ", ses=" + ses + ']', e);
+
+ nonblockingStop();
+
+ clientMsgWorkers.remove(clientNodeId, this);
+
+ return new GridNioFinishedFuture<>(e);
+ }
}
/**
@@ -5705,7 +5789,16 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgBytes Message bytes to send.
*/
public void sendMessage(final TcpDiscoveryAbstractMessage msg, @Nullable final byte[] msgBytes) {
- spi.sendMessage(ses, msg, msgBytes);
+ try {
+ spi.sendMessage(ses, msg, msgBytes);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed marshal message, closing connection. [msg=" + msg + ", ses=" + ses + ']', e);
+
+ nonblockingStop();
+
+ clientMsgWorkers.remove(clientNodeId, this);
+ }
}
/** {@inheritDoc} */
@@ -5760,93 +5853,12 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- /**
- * Listener for client messages.
- */
- private class ClientNioListener<T> implements GridNioServerListener<T> {
- /** */
- private final IgniteLogger log;
-
- /** */
- private final long writeTimeout;
-
- /** */
- private final ClientNioMessageProcessor<T> msgProc;
-
- /**
- * @param log Logger.
- * @param writeTimeout Socket write timeout.
- */
- private ClientNioListener(final IgniteLogger log, final long writeTimeout) {
- this.log = log;
- this.writeTimeout = writeTimeout;
-
- msgProc = new ClientNioMessageProcessor<>(log);
- }
-
- /** {@inheritDoc} */
- @Override public void onConnected(final GridNioSession ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onDisconnected(final GridNioSession ses, @Nullable final Exception e) {
- final UUID clientNodeId = msgProc.clientNodeId(ses);
-
- if (log.isDebugEnabled())
- log.debug("Stopping message worker on disconnect [remoteAddr=" + ses.remoteAddress() +
- ", remote node ID=" + clientNodeId + ']');
- final ClientMessageProcessor proc = clientMsgWorkers.get(clientNodeId);
-
- if (proc != null && proc instanceof ClientNioMessageWorker && ((ClientNioMessageWorker)proc).ses == ses) {
- if (clientMsgWorkers.remove(clientNodeId, proc))
- ((ClientNioMessageWorker)proc).nonblockingStop();
- }
- else {
- if (log.isDebugEnabled())
- log.error("Illegal ClientMessageProcessor: " + proc);
-
- if (ses.closeTime() == 0)
- ses.close();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onMessage(final GridNioSession ses, final T msg0) {
- // Release nio thread.
- nioClientProcessingPool.submit(new Runnable() {
- @Override public void run() {
- msgProc.processMessage(ses, msg0);
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionWriteTimeout(final GridNioSession ses) {
- final UUID clientNodeId = msgProc.clientNodeId(ses);
-
- if (log.isDebugEnabled())
- log.debug("Stopping message worker on write timeout [remoteAddr=" + ses.remoteAddress() +
- ", writeTimeout=" + writeTimeout + ", remote node ID=" + clientNodeId + ']');
-
- final ClientMessageProcessor proc = clientMsgWorkers.remove(clientNodeId);
-
- stopClientProcessor(proc, false);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionIdleTimeout(final GridNioSession ses) {
- // No-op.
- }
- }
/**
* Processes incoming nio client messages.
- *
- * @param <T> Message type.
*/
- private class ClientNioMessageProcessor<T> {
+ private class ClientNioMessageProcessor {
/** */
private final IgniteLogger log;
@@ -5863,7 +5875,10 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param ses Nio session.
* @param msg0 Incoming message.
*/
- void processMessage(GridNioSession ses, T msg0) {
+ void processMessage(GridNioSession ses, byte[] msg0) throws IgniteCheckedException {
+ final TcpDiscoveryAbstractMessage msg = spi.marshaller().unmarshal(msg0,
+ U.resolveClassLoader(spi.ignite().configuration()));
+
final UUID nodeId = getConfiguredNodeId();
final UUID clientNodeId = clientNodeId(ses);
@@ -5872,7 +5887,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (clientMsgWrk == null || clientMsgWrk.ses != ses) {
if (log.isDebugEnabled())
log.debug("NIO Worker has been closed, drop message. [clientNodeId="
- + clientNodeId + ", message=" + msg0 + ", clientMsgWrk=" + clientMsgWrk + "]");
+ + clientNodeId + ", message=" + msg + ", clientMsgWrk=" + clientMsgWrk + "]");
if (ses.closeTime() == 0)
ses.close();
@@ -5880,8 +5895,6 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- final TcpDiscoveryAbstractMessage msg = (TcpDiscoveryAbstractMessage)msg0;
-
msg.senderNodeId(nodeId);
if (log.isDebugEnabled())
@@ -6103,183 +6116,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- *
- */
- private class ClientNioFilter extends GridNioFilterAdapter {
- /**
- * @param name Name.
- */
- ClientNioFilter(final String name) {
- super(name);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionOpened(final GridNioSession ses) throws IgniteCheckedException {
- proceedSessionOpened(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionClosed(final GridNioSession ses) throws IgniteCheckedException {
- final UUID clientId = ses.meta(NODE_ID_META);
-
- if (clientId != null) {
- final ClientNioMessageWorker proc = (ClientNioMessageWorker)clientMsgWorkers.get(clientId);
-
- if (proc != null && proc.ses == ses && clientMsgWorkers.remove(clientId, proc))
- proc.nonblockingStop();
- }
-
- proceedSessionClosed(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onExceptionCaught(final GridNioSession ses,
- final IgniteCheckedException ex) throws IgniteCheckedException {
- proceedExceptionCaught(ses, ex);
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(final GridNioSession ses,
- final Object msg) throws IgniteCheckedException {
- final byte[] bytes = msg instanceof byte[] ? (byte[]) msg : spi.marshaller().marshal(msg);
-
- return proceedSessionWrite(ses, ByteBuffer.wrap(bytes));
- }
-
- /** {@inheritDoc} */
- @Override public void onMessageReceived(final GridNioSession ses,
- final Object msg) throws IgniteCheckedException {
- ByteBuffer msgBuf = ses.meta(INCOMPLETE_MESSAGE_META);
-
- final ByteBuffer buf = (ByteBuffer)msg;
-
- ByteBufferInputStream in = null;
-
- if (msgBuf == null || msgBuf.position() == 0) {
- // first packet
- final int msgLen = getMessageLength(buf, ses);
-
- if (msgLen == -1)
- return;
-
- if (msgLen <= buf.remaining())
- in = new ByteBufferInputStream(buf, msgLen, false);
- else {
- msgBuf = enlargeIfNeed(msgBuf, msgLen);
-
- ses.addMeta(INCOMPLETE_MESSAGE_META, msgBuf);
- }
- }
-
- if (in == null) {
- final int oldLim = buf.limit();
-
- if (buf.remaining() > msgBuf.remaining())
- buf.limit(buf.position() + msgBuf.remaining());
-
- msgBuf.put(buf);
-
- buf.limit(oldLim);
-
- if (!msgBuf.hasRemaining()) {
- msgBuf.rewind();
-
- in = new ByteBufferInputStream(msgBuf, msgBuf.limit(), true);
- }
- }
-
- if (in != null) {
- // unmarshal and process
- final Object obj;
-
- try {
- obj = spi.marshaller().unmarshal(in,
- U.resolveClassLoader(spi.ignite().configuration()));
- }
- finally {
- U.closeQuiet(in);
- }
-
- proceedMessageReceived(ses, obj);
-
- // There are left bytes not processed
- while (buf.hasRemaining())
- onMessageReceived(ses, msg);
- }
- }
-
- /**
- * @param buf Byte buffer.
- * @param len Length.
- * @return New buffer.
- */
- private ByteBuffer enlargeIfNeed(ByteBuffer buf, int len) {
- assert buf == null || buf.position() == 0 : buf;
-
- if (buf == null || buf.capacity() < len)
- buf = ByteBuffer.allocateDirect(len);
-
- buf.limit(len);
-
- return buf;
- }
-
- /**
- * @param buf Input buffer.
- * @return Message length or -1 if not all bytes of length were read.
- */
- private int getMessageLength(final ByteBuffer buf, final GridNioSession ses) {
- ByteBuffer lenBuf = ses.meta(MESSAGE_LEN_META);
-
- int len = -1;
-
- if (lenBuf != null || buf.remaining() < 4) {
- if (lenBuf == null) {
- lenBuf = ByteBuffer.allocate(4);
-
- ses.addMeta(MESSAGE_LEN_META, lenBuf);
- }
-
- buf.get(lenBuf.array(), lenBuf.position(), Math.min(buf.remaining(), lenBuf.remaining()));
-
- if (lenBuf.remaining() == 0) {
- lenBuf.order(ByteOrder.BIG_ENDIAN);
-
- len = lenBuf.getInt();
-
- ses.removeMeta(MESSAGE_LEN_META);
- }
- }
- else {
- final ByteOrder curOrder = buf.order();
-
- buf.order(ByteOrder.BIG_ENDIAN);
-
- len = buf.getInt();
-
- buf.order(curOrder);
- }
-
- return len;
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<Boolean> onSessionClose(final GridNioSession ses) throws IgniteCheckedException {
- return proceedSessionClose(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionIdleTimeout(final GridNioSession ses) throws IgniteCheckedException {
- proceedSessionIdleTimeout(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionWriteTimeout(final GridNioSession ses) throws IgniteCheckedException {
- proceedSessionWriteTimeout(ses);
- }
- }
-
- /**
* Thread that reads messages from the socket created for incoming connections.
*/
private class SocketReader extends IgniteSpiThread {
@@ -7910,61 +7746,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- *
- */
- private static class ByteBufferInputStream extends InputStream {
- /** */
- private final ByteBuffer buf;
-
- /** */
- private final int oldLimit;
-
- /** */
- private final boolean clearOnClose;
-
- /**
- * @param buf Byte buffer.
- * @param len Length that will be set as buf.limit(buf.position() + len).
- * @param clearOnClose Clear buffer on close.
- */
- private ByteBufferInputStream(final ByteBuffer buf, final int len, final boolean clearOnClose) {
- this.buf = buf;
- this.oldLimit = buf.limit();
- this.clearOnClose = clearOnClose;
-
- buf.limit(buf.position() + len);
- }
-
- /** {@inheritDoc} */
- @Override public int read() throws IOException {
- if (!buf.hasRemaining())
- return -1;
-
- return buf.get() & 0xFF;
- }
-
- /** {@inheritDoc} */
- @Override public int read(@NotNull final byte[] b, final int off, final int len) throws IOException {
- int toRead = Math.min(len, buf.remaining());
-
- if (toRead == 0)
- return -1;
-
- buf.get(b, off, toRead);
-
- return toRead;
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IOException {
- if (clearOnClose)
- buf.clear();
- else
- buf.limit(oldLimit);
- }
- }
-
- /**
* Actually marshals client nio messages to release ring worker from that routine.
*/
private class NioSendWorker extends GridWorker {
http://git-wip-us.apache.org/repos/asf/ignite/blob/661efc21/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index d648af1..19e5249 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -30,6 +30,7 @@ import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
@@ -294,6 +295,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Default max number of messages that could be queued to send to client. (value is <tt>0</tt>). */
public static final int DFLT_CLIENT_SEND_MSG_QUEUE_LIMIT = 0;
+ /** Default value for use direct or heap buffer. (value is <tt>true</tt>) */
+ public static final boolean DFLT_CLIENT_NIO_DIRECT_BUF = true;
+
+ /** Default byte order for nio client buffers. (value is <tt>ByteOrder.nativeOrder()</tt>) */
+ public static final ByteOrder DFLT_CLIENT_NIO_BYTE_ORDER = ByteOrder.nativeOrder();
+
/** Local address. */
protected String locAddr;
@@ -397,6 +404,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Max number of messages that could be queued to send to client. */
protected int clientSndMsgQueueLimit = DFLT_CLIENT_SEND_MSG_QUEUE_LIMIT;
+ /** Use direct or heap buffer flag. */
+ protected boolean clientNioDirectBuf = DFLT_CLIENT_NIO_DIRECT_BUF;
+
+ /** Byte order for nio client buffers. */
+ protected ByteOrder clientNioByteOrder = DFLT_CLIENT_NIO_BYTE_ORDER;
+
/** Node authenticator. */
protected DiscoverySpiNodeAuthenticator nodeAuth;
@@ -891,6 +904,55 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
+ * Gets flag that indicates whether use direct buffer or heap buffer for nio client
+ * connections.
+ * <p>
+ * Defaults to {@link #DFLT_CLIENT_NIO_DIRECT_BUF}
+ * </p>
+ *
+ * @return {@code True} if use direct buffer.
+ */
+ public boolean isClientDirectBuffer() {
+ return clientNioDirectBuf;
+ }
+
+ /**
+ * Sets flag that indicates whether use direct buffer or heap buffer for nio client
+ * connections.
+ *
+ * @param directBuf Direct buffer flag.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setClientDirectBuffer(boolean directBuf) {
+ this.clientNioDirectBuf = directBuf;
+ }
+
+ /**
+ * Gets byte order that is used in client nio buffers.
+ * <p>
+ * Defaults to {@link #DFLT_CLIENT_NIO_BYTE_ORDER}
+ * </p>
+ *
+ * @return Client nio buffers byte order.
+ */
+ public ByteOrder getClientByteOrder() {
+ return clientNioByteOrder;
+ }
+
+ /**
+ * Sets byte order that is used in client nio buffers.
+ * <p>
+ * Defaults to {@link #DFLT_CLIENT_NIO_BYTE_ORDER}
+ * </p>
+ *
+ * @param order Client nio buffers byte order.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setClientByteOrder(ByteOrder order) {
+ this.clientNioByteOrder = order;
+ }
+
+ /**
* Gets IP finder for IP addresses sharing and storing.
*
* @return IP finder for IP addresses sharing and storing.
@@ -1569,7 +1631,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @return Future.
*/
protected GridNioFuture<?> sendMessage(GridNioSession ses, @Nullable TcpDiscoveryAbstractMessage msg,
- @Nullable byte[] msgBytes) {
+ @Nullable byte[] msgBytes) throws IgniteCheckedException {
assert msg != null || msgBytes != null : "Null message to send";
final GridNioFuture<?> fut;
@@ -1577,7 +1639,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
if (msgBytes != null)
fut = ses.send(msgBytes);
else
- fut = ses.send(msg);
+ fut = ses.send(marshaller().marshal(msg));
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/661efc21/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index e87b96d..78752b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2189,24 +2189,29 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
fut.onDone(e);
}
- TestTcpDiscoverySpi.super.sendMessage(ses, msg, msgBytes).chain(new C1<IgniteInternalFuture<?>, Object>() {
- private static final long serialVersionUID = 0L;
+ try {
+ TestTcpDiscoverySpi.super.sendMessage(ses, msg, msgBytes).chain(new C1<IgniteInternalFuture<?>, Object>() {
+ private static final long serialVersionUID = 0L;
- @Override public Object apply(final IgniteInternalFuture<?> igniteInternalFut) {
- try {
- final Object obj = igniteInternalFut.get();
+ @Override public Object apply(final IgniteInternalFuture<?> igniteInternalFut) {
+ try {
+ final Object obj = igniteInternalFut.get();
- fut.onDone(obj, null);
+ fut.onDone(obj, null);
- return obj;
- }
- catch (IgniteCheckedException e) {
- fut.onDone(e);
- }
+ return obj;
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
- return null;
- }
- });
+ return null;
+ }
+ });
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
}
};