You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:16:15 UTC
[35/50] [abbrv] ignite git commit: IGNITE-3220 I/O bottleneck on
server/client cluster configuration Communications optimizations: -
possibility to open separate in/out connections - possibility to have
multiple connections between nodes - implemented NI
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index c8e2e0b..bc1f173 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
@@ -44,6 +45,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -63,6 +66,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -86,11 +90,14 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPER
*
*/
public class GridNioServer<T> {
+ /** */
+ public static final String IGNITE_IO_BALANCE_RANDOM_BALANCE = "IGNITE_IO_BALANCE_RANDOM_BALANCER";
+
/** Default session write timeout. */
public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
/** Default send queue limit. */
- public static final int DFLT_SEND_QUEUE_LIMIT = 1024;
+ public static final int DFLT_SEND_QUEUE_LIMIT = 0;
/** Time, which server will wait before retry operation. */
private static final long ERR_WAIT_TIME = 2000;
@@ -122,6 +129,9 @@ public class GridNioServer<T> {
}
}
+ /** Defines how many times selector should do {@code selectNow()} before doing {@code select(long)}. */
+ private long selectorSpins;
+
/** Accept worker thread. */
@GridToStringExclude
private final IgniteThread acceptThread;
@@ -145,9 +155,13 @@ public class GridNioServer<T> {
/** Flag indicating if this server should use direct buffers. */
private final boolean directBuf;
- /** Index to select which thread will serve next socket channel. Using round-robin balancing. */
+ /** Index to select which thread will serve next incoming socket channel. Using round-robin balancing. */
+ @GridToStringExclude
+ private int readBalanceIdx;
+
+ /** Index to select which thread will serve next out socket channel. Using round-robin balancing. */
@GridToStringExclude
- private int balanceIdx;
+ private int writeBalanceIdx = 1;
/** Tcp no delay flag. */
private final boolean tcpNoDelay;
@@ -204,12 +218,25 @@ public class GridNioServer<T> {
/** Optional listener to monitor outbound message queue size. */
private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
+ /** */
+ private final AtomicLong readerMoveCnt = new AtomicLong();
+
+ /** */
+ private final AtomicLong writerMoveCnt = new AtomicLong();
+
+ /** */
+ private final IgniteRunnable balancer;
+
/**
* @param addr Address.
* @param port Port.
* @param log Log.
* @param selectorCnt Count of selectors and selecting threads.
* @param gridName Grid name.
+ * @param srvName Logical server name for threads identification.
+ * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
* @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
* @param directBuf Direct buffer flag.
* @param order Byte order.
@@ -223,6 +250,7 @@ public class GridNioServer<T> {
* @param writerFactory Writer factory.
* @param skipRecoveryPred Skip recovery predicate.
* @param msgQueueLsnr Message queue size listener.
+ * @param balancing NIO sessions balancing flag.
* @param filters Filters for this server.
* @throws IgniteCheckedException If failed.
*/
@@ -232,6 +260,8 @@ public class GridNioServer<T> {
IgniteLogger log,
int selectorCnt,
@Nullable String gridName,
+ @Nullable String srvName,
+ long selectorSpins,
boolean tcpNoDelay,
boolean directBuf,
ByteOrder order,
@@ -245,6 +275,7 @@ public class GridNioServer<T> {
GridNioMessageWriterFactory writerFactory,
IgnitePredicate<Message> skipRecoveryPred,
IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
+ boolean balancing,
GridNioFilter... filters
) throws IgniteCheckedException {
if (port != -1)
@@ -268,6 +299,7 @@ public class GridNioServer<T> {
this.sockSndBuf = sockSndBuf;
this.sndQueueLimit = sndQueueLimit;
this.msgQueueLsnr = msgQueueLsnr;
+ this.selectorSpins = selectorSpins;
filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
@@ -299,9 +331,16 @@ public class GridNioServer<T> {
clientThreads = new IgniteThread[selectorCnt];
for (int i = 0; i < selectorCnt; i++) {
+ String threadName;
+
+ if (srvName == null)
+ threadName = "grid-nio-worker-" + i;
+ else
+ threadName = "grid-nio-worker-" + srvName + "-" + i;
+
AbstractNioClientWorker worker = directMode ?
- new DirectNioClientWorker(i, gridName, "grid-nio-worker-" + i, log) :
- new ByteBufferNioClientWorker(i, gridName, "grid-nio-worker-" + i, log);
+ new DirectNioClientWorker(i, gridName, threadName, log) :
+ new ByteBufferNioClientWorker(i, gridName, threadName, log);
clientWorkers.add(worker);
@@ -315,6 +354,32 @@ public class GridNioServer<T> {
this.writerFactory = writerFactory;
this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse();
+
+ long balancePeriod = IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, 5000);
+
+ IgniteRunnable balancer0 = null;
+
+ if (balancing && balancePeriod > 0) {
+ boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false);
+
+ balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer(balancePeriod);
+ }
+
+ this.balancer = balancer0;
+ }
+
+ /**
+ * @return Number of reader sessions move.
+ */
+ public long readerMoveCount() {
+ return readerMoveCnt.get();
+ }
+
+ /**
+ * @return Number of reader writer move.
+ */
+ public long writerMoveCount() {
+ return writerMoveCnt.get();
}
/**
@@ -377,6 +442,13 @@ public class GridNioServer<T> {
}
/**
+ * @return Selector spins.
+ */
+ public long selectorSpins() {
+ return selectorSpins;
+ }
+
+ /**
* @param ses Session to close.
* @return Future for operation.
*/
@@ -390,7 +462,7 @@ public class GridNioServer<T> {
NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
- clientWorkers.get(impl.selectorIndex()).offer(fut);
+ impl.offerStateChange(fut);
return fut;
}
@@ -398,61 +470,91 @@ public class GridNioServer<T> {
/**
* @param ses Session.
* @param msg Message.
+ * @param createFut {@code True} if future should be created.
* @return Future for operation.
*/
- GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg) {
- assert ses instanceof GridSelectorNioSessionImpl;
+ GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException {
+ assert ses instanceof GridSelectorNioSessionImpl : ses;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
- NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+ if (createFut) {
+ NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
- send0(impl, fut, false);
+ send0(impl, fut, false);
- return fut;
+ return fut;
+ }
+ else {
+ SessionWriteRequest req = new WriteRequestImpl(ses, msg, true);
+
+ send0(impl, req, false);
+
+ return null;
+ }
}
/**
* @param ses Session.
* @param msg Message.
+ * @param createFut {@code True} if future should be created.
* @return Future for operation.
*/
- GridNioFuture<?> send(GridNioSession ses, Message msg) {
+ GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException {
assert ses instanceof GridSelectorNioSessionImpl;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
- NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
- skipRecoveryPred.apply(msg));
+ if (createFut) {
+ NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+ skipRecoveryPred.apply(msg));
- send0(impl, fut, false);
+ send0(impl, fut, false);
- return fut;
+ return fut;
+ }
+ else {
+ SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg));
+
+ send0(impl, req, false);
+
+ return null;
+ }
}
/**
* @param ses Session.
- * @param fut Future.
+ * @param req Request.
* @param sys System message flag.
+ * @throws IgniteCheckedException If session was closed.
*/
- private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) {
+ private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest req, boolean sys) throws IgniteCheckedException {
assert ses != null;
- assert fut != null;
+ assert req != null;
- int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
+ int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req);
IgniteInClosure<IgniteException> ackC;
if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
- fut.ackClosure(ackC);
+ req.ackClosure(ackC);
if (ses.closed()) {
- if (ses.removeFuture(fut))
- fut.connectionClosed();
+ if (ses.removeFuture(req)) {
+ IOException err = new IOException("Failed to send message (connection was closed): " + ses);
+
+ req.onError(err);
+
+ if (!(req instanceof GridNioFuture))
+ throw new IgniteCheckedException(err);
+ }
+ }
+ else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) {
+ AbstractNioClientWorker worker = (AbstractNioClientWorker)ses.worker();
+
+ if (worker != null)
+ worker.offer((SessionChangeRequest)req);
}
- else if (msgCnt == 1)
- // Change from 0 to 1 means that worker thread should be waken up.
- clientWorkers.get(ses.selectorIndex()).offer(fut);
if (msgQueueLsnr != null)
msgQueueLsnr.apply(ses, msgCnt);
@@ -463,10 +565,10 @@ public class GridNioServer<T> {
*
* @param ses Session.
* @param msg Message.
- * @return Future.
+ * @throws IgniteCheckedException If session was closed.
*/
- public GridNioFuture<?> sendSystem(GridNioSession ses, Message msg) {
- return sendSystem(ses, msg, null);
+ public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException {
+ sendSystem(ses, msg, null);
}
/**
@@ -475,27 +577,30 @@ public class GridNioServer<T> {
* @param ses Session.
* @param msg Message.
* @param lsnr Future listener notified from the session thread.
- * @return Future.
+ * @throws IgniteCheckedException If session was closed.
*/
- public GridNioFuture<?> sendSystem(GridNioSession ses,
+ public void sendSystem(GridNioSession ses,
Message msg,
- @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) {
+ @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws IgniteCheckedException {
assert ses instanceof GridSelectorNioSessionImpl;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
- NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
- skipRecoveryPred.apply(msg));
-
if (lsnr != null) {
+ NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+ skipRecoveryPred.apply(msg));
+
fut.listen(lsnr);
assert !fut.isDone();
- }
- send0(impl, fut, true);
+ send0(impl, fut, true);
+ }
+ else {
+ SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
- return fut;
+ send0(impl, req, true);
+ }
}
/**
@@ -504,37 +609,69 @@ public class GridNioServer<T> {
public void resend(GridNioSession ses) {
assert ses instanceof GridSelectorNioSessionImpl;
- GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+ GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
- if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
- Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
+ if (recoveryDesc != null && !recoveryDesc.messagesRequests().isEmpty()) {
+ Deque<SessionWriteRequest> futs = recoveryDesc.messagesRequests();
if (log.isDebugEnabled())
log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']');
GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
- GridNioFuture<?> fut0 = futs.iterator().next();
+ SessionWriteRequest fut0 = futs.iterator().next();
- for (GridNioFuture<?> fut : futs) {
+ for (SessionWriteRequest fut : futs) {
fut.messageThread(true);
- ((NioOperationFuture)fut).resetSession(ses0);
+ fut.resetSession(ses0);
}
ses0.resend(futs);
// Wake up worker.
- clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+ ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0);
}
}
/**
+ * @return Sessions.
+ */
+ public Collection<? extends GridNioSession> sessions() {
+ return sessions;
+ }
+
+ /**
+ * @return Workers.
+ */
+ public List<AbstractNioClientWorker> workers() {
+ return clientWorkers;
+ }
+
+ /**
+ * @param ses Session.
+ * @param from Move from index.
+ * @param to Move to index.
+ */
+ private void moveSession(GridNioSession ses, int from, int to) {
+ assert from >= 0 && from < clientWorkers.size() : from;
+ assert to >= 0 && to < clientWorkers.size() : to;
+ assert from != to;
+
+ GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+
+ SessionMoveFuture fut = new SessionMoveFuture(ses0, to);
+
+ if (!ses0.offerMove(clientWorkers.get(from), fut))
+ fut.onDone(false);
+ }
+
+ /**
* @param ses Session.
* @param op Operation.
* @return Future for operation.
*/
- GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
+ private GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
assert ses instanceof GridSelectorNioSessionImpl;
assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
@@ -546,7 +683,7 @@ public class GridNioServer<T> {
NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op);
- clientWorkers.get(impl.selectorIndex()).offer(fut);
+ impl.offerStateChange(fut);
return fut;
}
@@ -555,6 +692,9 @@ public class GridNioServer<T> {
*
*/
public void dumpStats() {
+ U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
+ ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
+
for (int i = 0; i < clientWorkers.size(); i++)
clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS));
}
@@ -675,12 +815,35 @@ public class GridNioServer<T> {
* @param req Request to balance.
*/
private synchronized void offerBalanced(NioOperationFuture req) {
- clientWorkers.get(balanceIdx).offer(req);
+ assert req.operation() == NioOperation.REGISTER : req;
+ assert req.socketChannel() != null : req;
+
+ int workers = clientWorkers.size();
+
+ int balanceIdx;
+
+ if (workers > 1) {
+ if (req.accepted()) {
+ balanceIdx = readBalanceIdx;
+
+ readBalanceIdx += 2;
+
+ if (readBalanceIdx >= workers)
+ readBalanceIdx = 0;
+ }
+ else {
+ balanceIdx = writeBalanceIdx;
- balanceIdx++;
+ writeBalanceIdx += 2;
- if (balanceIdx == clientWorkers.size())
+ if (writeBalanceIdx >= workers)
+ writeBalanceIdx = 1;
+ }
+ }
+ else
balanceIdx = 0;
+
+ clientWorkers.get(balanceIdx).offer(req);
}
/** {@inheritDoc} */
@@ -792,21 +955,30 @@ public class GridNioServer<T> {
while (true) {
ByteBuffer buf = ses.removeMeta(BUF_META_KEY);
- NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+ SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
// Check if there were any pending data from previous writes.
if (buf == null) {
assert req == null;
- req = (NioOperationFuture<?>)ses.pollFuture();
+ req = ses.pollFuture();
if (req == null) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ if (ses.procWrite.get()) {
+ ses.procWrite.set(false);
+
+ if (ses.writeQueue().isEmpty()) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+ else
+ ses.procWrite.set(true);
+ }
break;
}
- buf = req.message();
+ buf = (ByteBuffer)req.message();
}
if (!skipWrite) {
@@ -841,10 +1013,15 @@ public class GridNioServer<T> {
// Message was successfully written.
assert req != null;
- req.onDone();
+ req.onMessageWritten();
}
}
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ByteBufferNioClientWorker.class, this, super.toString());
+ }
}
/**
@@ -909,6 +1086,7 @@ public class GridNioServer<T> {
metricsLsnr.onBytesReceived(cnt);
ses.bytesReceived(cnt);
+ onRead(cnt);
readBuf.flip();
@@ -921,6 +1099,12 @@ public class GridNioServer<T> {
readBuf.compact();
else
readBuf.clear();
+
+ if (ses.hasSystemMessage() && !ses.procWrite.get()) {
+ ses.procWrite.set(true);
+
+ registerWrite(ses);
+ }
}
catch (IgniteCheckedException e) {
close(ses, e);
@@ -993,16 +1177,29 @@ public class GridNioServer<T> {
if (ses.meta(WRITE_BUF_LIMIT) != null)
buf.limit((int)ses.meta(WRITE_BUF_LIMIT));
- NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+ SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
while (true) {
if (req == null) {
- req = (NioOperationFuture<?>)ses.pollFuture();
+ req = systemMessage(ses);
- if (req == null && buf.position() == 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ if (req == null) {
+ req = ses.pollFuture();
- break;
+ if (req == null && buf.position() == 0) {
+ if (ses.procWrite.get()) {
+ ses.procWrite.set(false);
+
+ if (ses.writeQueue().isEmpty()) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+ else
+ ses.procWrite.set(true);
+ }
+
+ break;
+ }
}
}
@@ -1010,7 +1207,7 @@ public class GridNioServer<T> {
boolean finished = false;
if (req != null) {
- msg = req.directMessage();
+ msg = (Message)req.message();
assert msg != null;
@@ -1025,14 +1222,17 @@ public class GridNioServer<T> {
// Fill up as many messages as possible to write buffer.
while (finished) {
- req.onDone();
+ req.onMessageWritten();
- req = (NioOperationFuture<?>)ses.pollFuture();
+ req = systemMessage(ses);
+
+ if (req == null)
+ req = ses.pollFuture();
if (req == null)
break;
- msg = req.directMessage();
+ msg = (Message)req.message();
assert msg != null;
@@ -1129,13 +1329,31 @@ public class GridNioServer<T> {
ses.bytesSent(cnt);
if (!buf.hasRemaining())
- queue.remove(buf);
+ queue.poll();
else
break;
}
}
/**
+ * @param ses Session.
+ * @return System message request.
+ */
+ private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) {
+ if (ses.hasSystemMessage()) {
+ Object msg = ses.systemMessage();
+
+ SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
+
+ assert !ses.hasSystemMessage();
+
+ return req;
+ }
+
+ return null;
+ }
+
+ /**
* Processes write-ready event on the key.
*
* @param key Key that is ready to be written.
@@ -1147,7 +1365,7 @@ public class GridNioServer<T> {
GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
ByteBuffer buf = ses.writeBuffer();
- NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+ SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
@@ -1161,12 +1379,25 @@ public class GridNioServer<T> {
}
if (req == null) {
- req = (NioOperationFuture<?>)ses.pollFuture();
+ req = systemMessage(ses);
- if (req == null && buf.position() == 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ if (req == null) {
+ req = ses.pollFuture();
- return;
+ if (req == null && buf.position() == 0) {
+ if (ses.procWrite.get()) {
+ ses.procWrite.set(false);
+
+ if (ses.writeQueue().isEmpty()) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+ else
+ ses.procWrite.set(true);
+ }
+
+ return;
+ }
}
}
@@ -1174,9 +1405,9 @@ public class GridNioServer<T> {
boolean finished = false;
if (req != null) {
- msg = req.directMessage();
+ msg = (Message)req.message();
- assert msg != null;
+ assert msg != null : req;
if (writer != null)
writer.setCurrentWriteClass(msg.getClass());
@@ -1189,14 +1420,17 @@ public class GridNioServer<T> {
// Fill up as many messages as possible to write buffer.
while (finished) {
- req.onDone();
+ req.onMessageWritten();
- req = (NioOperationFuture<?>)ses.pollFuture();
+ req = systemMessage(ses);
+
+ if (req == null)
+ req = ses.pollFuture();
if (req == null)
break;
- msg = req.directMessage();
+ msg = (Message)req.message();
assert msg != null;
@@ -1223,6 +1457,7 @@ public class GridNioServer<T> {
metricsLsnr.onBytesSent(cnt);
ses.bytesSent(cnt);
+ onWrite(cnt);
}
else {
// For test purposes only (skipWrite is set to true in tests only).
@@ -1242,14 +1477,19 @@ public class GridNioServer<T> {
else
buf.clear();
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DirectNioClientWorker.class, this, super.toString());
+ }
}
/**
* Thread performing only read operations from the channel.
*/
- private abstract class AbstractNioClientWorker extends GridWorker {
+ private abstract class AbstractNioClientWorker extends GridWorker implements GridNioWorker {
/** Queue of change requests on this selector. */
- private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>();
/** Selector to select read events. */
private Selector selector;
@@ -1260,6 +1500,25 @@ public class GridNioServer<T> {
/** Worker index. */
private final int idx;
+ /** */
+ private long bytesRcvd;
+
+ /** */
+ private long bytesSent;
+
+ /** */
+ private volatile long bytesRcvd0;
+
+ /** */
+ private volatile long bytesSent0;
+
+ /** Sessions assigned to this worker. */
+ private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions =
+ new GridConcurrentHashSet<>();
+
+ /** {@code True} if worker has called or is about to call {@code Selector.select()}. */
+ private volatile boolean select;
+
/**
* @param idx Index of this worker in server's array.
* @param gridName Grid name.
@@ -1322,15 +1581,15 @@ public class GridNioServer<T> {
try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
- Class<?> selectorImplClass =
+ Class<?> selectorImplCls =
Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
// Ensure the current selector implementation is what we can instrument.
- if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+ if (!selectorImplCls.isAssignableFrom(selector.getClass()))
return;
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
+ Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys");
+ Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
@@ -1357,48 +1616,126 @@ public class GridNioServer<T> {
*
* @param req Change request.
*/
- private void offer(NioOperationFuture req) {
+ public void offer(SessionChangeRequest req) {
changeReqs.offer(req);
+ if (select)
+ selector.wakeup();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void offer(Collection<SessionChangeRequest> reqs) {
+ for (SessionChangeRequest req : reqs)
+ changeReqs.offer(req);
+
selector.wakeup();
}
+ /** {@inheritDoc} */
+ @Override public List<SessionChangeRequest> clearSessionRequests(GridNioSession ses) {
+ List<SessionChangeRequest> sesReqs = null;
+
+ for (SessionChangeRequest changeReq : changeReqs) {
+ if (changeReq.session() == ses && !(changeReq instanceof SessionMoveFuture)) {
+ boolean rmv = changeReqs.remove(changeReq);
+
+ assert rmv : changeReq;
+
+ if (sesReqs == null)
+ sesReqs = new ArrayList<>();
+
+ sesReqs.add(changeReq);
+ }
+ }
+
+ return sesReqs;
+ }
+
/**
* Processes read and write events and registration requests.
*
* @throws IgniteCheckedException If IOException occurred or thread was unable to add worker to workers pool.
*/
@SuppressWarnings("unchecked")
- private void bodyInternal() throws IgniteCheckedException {
+ private void bodyInternal() throws IgniteCheckedException, InterruptedException {
try {
long lastIdleCheck = U.currentTimeMillis();
+ mainLoop:
while (!closed && selector.isOpen()) {
- NioOperationFuture req;
+ SessionChangeRequest req0;
- while ((req = changeReqs.poll()) != null) {
- switch (req.operation()) {
+ while ((req0 = changeReqs.poll()) != null) {
+ switch (req0.operation()) {
case REGISTER: {
- register(req);
+ register((NioOperationFuture)req0);
break;
}
- case REQUIRE_WRITE: {
- //Just register write key.
- SelectionKey key = req.session().key();
+ case MOVE: {
+ SessionMoveFuture f = (SessionMoveFuture)req0;
- if (key.isValid()) {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ GridSelectorNioSessionImpl ses = f.session();
+
+ if (idx == f.toIdx) {
+ assert f.movedSocketChannel() != null : f;
+
+ boolean add = workerSessions.add(ses);
+
+ assert add;
- // Update timestamp to protected against false write timeout.
- ((GridNioSessionImpl)key.attachment()).bytesSent(0);
+ ses.finishMoveSession(this);
+
+ if (idx % 2 == 0)
+ readerMoveCnt.incrementAndGet();
+ else
+ writerMoveCnt.incrementAndGet();
+
+ SelectionKey key = f.movedSocketChannel().register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+ ses);
+
+ ses.key(key);
+
+ ses.procWrite.set(true);
+
+ f.onDone(true);
+ }
+ else {
+ assert f.movedSocketChannel() == null : f;
+
+ if (workerSessions.remove(ses)) {
+ ses.startMoveSession(this);
+
+ SelectionKey key = ses.key();
+
+ assert key.channel() != null : key;
+
+ f.movedSocketChannel((SocketChannel)key.channel());
+
+ key.cancel();
+
+ clientWorkers.get(f.toIndex()).offer(f);
+ }
+ else
+ f.onDone(false);
}
break;
}
+ case REQUIRE_WRITE: {
+ SessionWriteRequest req = (SessionWriteRequest)req0;
+
+ registerWrite((GridSelectorNioSessionImpl)req.session());
+
+ break;
+ }
+
case CLOSE: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
if (close(req.session(), null))
req.onDone(true);
else
@@ -1408,6 +1745,8 @@ public class GridNioServer<T> {
}
case PAUSE_READ: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
SelectionKey key = req.session().key();
if (key.isValid()) {
@@ -1426,6 +1765,8 @@ public class GridNioServer<T> {
}
case RESUME_READ: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
SelectionKey key = req.session().key();
if (key.isValid()) {
@@ -1444,76 +1785,66 @@ public class GridNioServer<T> {
}
case DUMP_STATS: {
- StringBuilder sb = new StringBuilder();
+ NioOperationFuture req = (NioOperationFuture)req0;
- Set<SelectionKey> keys = selector.keys();
-
- sb.append(U.nl())
- .append(">> Selector info [idx=").append(idx)
- .append(", keysCnt=").append(keys.size())
- .append("]").append(U.nl());
-
- for (SelectionKey key : keys) {
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
- MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
- MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
-
- sb.append(" Connection info [")
- .append("rmtAddr=").append(ses.remoteAddress())
- .append(", locAddr=").append(ses.localAddress());
-
- GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
+ try {
+ dumpStats();
+ }
+ finally {
+ // Complete the request just in case (none should wait on this future).
+ req.onDone(true);
+ }
+ }
+ }
+ }
- if (desc != null) {
- sb.append(", msgsSent=").append(desc.sent())
- .append(", msgsAckedByRmt=").append(desc.acked())
- .append(", msgsRcvd=").append(desc.received())
- .append(", descIdHash=").append(System.identityHashCode(desc));
- }
- else
- sb.append(", recoveryDesc=null");
+ int res = 0;
- sb.append(", bytesRcvd=").append(ses.bytesReceived())
- .append(", bytesSent=").append(ses.bytesSent())
- .append(", opQueueSize=").append(ses.writeQueueSize())
- .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
- .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+ for (long i = 0; i < selectorSpins && res == 0; i++) {
+ res = selector.selectNow();
- int cnt = 0;
+ if (res > 0) {
+ // Walk through the ready keys collection and process network events.
+ if (selectedKeys == null)
+ processSelectedKeys(selector.selectedKeys());
+ else
+ processSelectedKeysOptimized(selectedKeys.flip());
+ }
- for (GridNioFuture<?> fut : ses.writeQueue()) {
- if (cnt == 0)
- sb.append(",\n opQueue=[").append(fut);
- else
- sb.append(',').append(fut);
+ if (!changeReqs.isEmpty())
+ continue mainLoop;
- if (++cnt == 5) {
- sb.append(']');
+ // Just in case we do busy selects.
+ long now = U.currentTimeMillis();
- break;
- }
- }
+ if (now - lastIdleCheck > 2000) {
+ lastIdleCheck = now;
+ checkIdle(selector.keys());
+ }
- sb.append("]").append(U.nl());
- }
+ if (isCancelled())
+ return;
+ }
- U.warn(log, sb.toString());
+ // Falling to blocking select.
+ select = true;
- // Complete the request just in case (none should wait on this future).
- req.onDone(true);
- }
+ try {
+ if (!changeReqs.isEmpty())
+ continue;
+
+ // Wake up every 2 seconds to check if closed.
+ if (selector.select(2000) > 0) {
+ // Walk through the ready keys collection and process network events.
+ if (selectedKeys == null)
+ processSelectedKeys(selector.selectedKeys());
+ else
+ processSelectedKeysOptimized(selectedKeys.flip());
}
}
-
- // Wake up every 2 seconds to check if closed.
- if (selector.select(2000) > 0) {
- // Walk through the ready keys collection and process network events.
- if (selectedKeys == null)
- processSelectedKeys(selector.selectedKeys());
- else
- processSelectedKeysOptimized(selectedKeys.flip());
+ finally {
+ select = false;
}
long now = U.currentTimeMillis();
@@ -1554,6 +1885,98 @@ public class GridNioServer<T> {
}
/**
+ * @param ses Session.
+ */
+ public final void registerWrite(GridSelectorNioSessionImpl ses) {
+ SelectionKey key = ses.key();
+
+ if (key.isValid()) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) == 0)
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+
+ // Update timestamp to protected against false write timeout.
+ ses.bytesSent(0);
+ }
+ }
+
+ /**
+ *
+ */
+ private void dumpStats() {
+ StringBuilder sb = new StringBuilder();
+
+ Set<SelectionKey> keys = selector.keys();
+
+ sb.append(U.nl())
+ .append(">> Selector info [idx=").append(idx)
+ .append(", keysCnt=").append(keys.size())
+ .append(", bytesRcvd=").append(bytesRcvd)
+ .append(", bytesRcvd0=").append(bytesRcvd0)
+ .append(", bytesSent=").append(bytesSent)
+ .append(", bytesSent0=").append(bytesSent0)
+ .append("]").append(U.nl());
+
+ for (SelectionKey key : keys) {
+ GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+
+ MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+ MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
+ sb.append(" Connection info [")
+ .append("in=").append(ses.accepted())
+ .append(", rmtAddr=").append(ses.remoteAddress())
+ .append(", locAddr=").append(ses.localAddress());
+
+ GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+
+ if (outDesc != null) {
+ sb.append(", msgsSent=").append(outDesc.sent())
+ .append(", msgsAckedByRmt=").append(outDesc.acked())
+ .append(", descIdHash=").append(System.identityHashCode(outDesc));
+ }
+ else
+ sb.append(", outRecoveryDesc=null");
+
+ GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+
+ if (inDesc != null) {
+ sb.append(", msgsRcvd=").append(inDesc.received())
+ .append(", lastAcked=").append(inDesc.lastAcknowledged())
+ .append(", descIdHash=").append(System.identityHashCode(inDesc));
+ }
+ else
+ sb.append(", inRecoveryDesc=null");
+
+ sb.append(", bytesRcvd=").append(ses.bytesReceived())
+ .append(", bytesRcvd0=").append(ses.bytesReceived0())
+ .append(", bytesSent=").append(ses.bytesSent())
+ .append(", bytesSent0=").append(ses.bytesSent0())
+ .append(", opQueueSize=").append(ses.writeQueueSize())
+ .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+ .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+
+ int cnt = 0;
+
+ for (SessionWriteRequest req : ses.writeQueue()) {
+ if (cnt == 0)
+ sb.append(",\n opQueue=[").append(req);
+ else
+ sb.append(',').append(req);
+
+ if (++cnt == 5) {
+ sb.append(']');
+
+ break;
+ }
+ }
+
+ sb.append("]").append(U.nl());
+ }
+
+ U.warn(log, sb.toString());
+ }
+
+ /**
* Processes keys selected by a selector.
*
* @param keys Selected keys.
@@ -1671,7 +2094,9 @@ public class GridNioServer<T> {
long idleTimeout0 = idleTimeout;
- if (!opWrite && now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) {
+ if (!opWrite &&
+ now - ses.lastReceiveTime() > idleTimeout0 &&
+ now - ses.lastSendScheduleTime() > idleTimeout0) {
filterChain.onSessionIdleTimeout(ses);
// Update timestamp to avoid multiple notifications within one timeout interval.
@@ -1715,7 +2140,7 @@ public class GridNioServer<T> {
final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl(
log,
- idx,
+ this,
filterChain,
(InetSocketAddress)sockCh.getLocalAddress(),
(InetSocketAddress)sockCh.getRemoteAddress(),
@@ -1739,6 +2164,7 @@ public class GridNioServer<T> {
resend(ses);
sessions.add(ses);
+ workerSessions.add(ses);
try {
filterChain.onSessionOpened(ses);
@@ -1764,7 +2190,7 @@ public class GridNioServer<T> {
}
/**
- * Closes the ses and all associated resources, then notifies the listener.
+ * Closes the session and all associated resources, then notifies the listener.
*
* @param ses Session to be closed.
* @param e Exception to be passed to the listener, if any.
@@ -1781,12 +2207,10 @@ public class GridNioServer<T> {
}
sessions.remove(ses);
+ workerSessions.remove(ses);
SelectionKey key = ses.key();
- // Shutdown input and output so that remote client will see correct socket close.
- Socket sock = ((SocketChannel)key.channel()).socket();
-
if (ses.setClosed()) {
ses.onClosed();
@@ -1798,6 +2222,9 @@ public class GridNioServer<T> {
((DirectBuffer)ses.readBuffer()).cleaner().clean();
}
+ // Shutdown input and output so that remote client will see correct socket close.
+ Socket sock = ((SocketChannel)key.channel()).socket();
+
try {
try {
sock.shutdownInput();
@@ -1824,28 +2251,35 @@ public class GridNioServer<T> {
ses.removeMeta(BUF_META_KEY);
// Since ses is in closed state, no write requests will be added.
- NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
+ SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
+
+ GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor();
+ GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor();
- GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+ IOException err = new IOException("Failed to send message (connection was closed): " + ses);
- if (recovery != null) {
+ if (outRecovery != null || inRecovery != null) {
try {
// Poll will update recovery data.
- while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) {
- if (fut.skipRecovery())
- fut.connectionClosed();
+ while ((req = ses.pollFuture()) != null) {
+ if (req.skipRecovery())
+ req.onError(err);
}
}
finally {
- recovery.release();
+ if (outRecovery != null)
+ outRecovery.release();
+
+ if (inRecovery != null && inRecovery != outRecovery)
+ inRecovery.release();
}
}
else {
- if (fut != null)
- fut.connectionClosed();
+ if (req != null)
+ req.onError(err);
- while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null)
- fut.connectionClosed();
+ while ((req = ses.pollFuture()) != null)
+ req.onError(err);
}
try {
@@ -1876,12 +2310,44 @@ public class GridNioServer<T> {
* @throws IOException If write failed.
*/
protected abstract void processWrite(SelectionKey key) throws IOException;
- }
- /**
- * Gets outbound messages queue size.
- *
- * @return Write queue size.
+ /**
+ * @param cnt
+ */
+ final void onRead(int cnt) {
+ bytesRcvd += cnt;
+ bytesRcvd0 += cnt;
+ }
+
+ /**
+ * @param cnt
+ */
+ final void onWrite(int cnt) {
+ bytesSent += cnt;
+ bytesSent0 += cnt;
+ }
+
+ /**
+ *
+ */
+ final void reset0() {
+ bytesSent0 = 0;
+ bytesRcvd0 = 0;
+
+ for (GridSelectorNioSessionImpl ses : workerSessions)
+ ses.reset0();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AbstractNioClientWorker.class, this, super.toString());
+ }
+ }
+
+ /**
+ * Gets outbound messages queue size.
+ *
+ * @return Write queue size.
*/
public int outboundMessagesQueueSize() {
int res = 0;
@@ -1952,6 +2418,9 @@ public class GridNioServer<T> {
if (selector.select(2000) > 0)
// Walk through the ready keys collection and process date requests.
processSelectedKeys(selector.selectedKeys());
+
+ if (balancer != null)
+ balancer.run();
}
}
// Ignore this exception as thread interruption is equal to 'close' call.
@@ -2048,10 +2517,13 @@ public class GridNioServer<T> {
/**
* Asynchronous operation that may be requested on selector.
*/
- private enum NioOperation {
+ enum NioOperation {
/** Register read key selection. */
REGISTER,
+ /** Move session between workers. */
+ MOVE,
+
/** Register write key selection. */
REQUIRE_WRITE,
@@ -2069,9 +2541,193 @@ public class GridNioServer<T> {
}
/**
+ *
+ */
+ private static final class WriteRequestSystemImpl implements SessionWriteRequest, SessionChangeRequest {
+ /** */
+ private final Object msg;
+
+ /** */
+ private final GridNioSession ses;
+
+ /**
+ * @param ses Session.
+ * @param msg Message.
+ */
+ WriteRequestSystemImpl(GridNioSession ses, Object msg) {
+ this.ses = ses;
+ this.msg = msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void messageThread(boolean msgThread) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean messageThread() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipRecovery() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInClosure<IgniteException> ackClosure() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(Exception e) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object message() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessageWritten() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resetSession(GridNioSession ses) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridNioSession session() {
+ return ses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public NioOperation operation() {
+ return NioOperation.REQUIRE_WRITE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(WriteRequestSystemImpl.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest {
+ /** */
+ private GridNioSession ses;
+
+ /** */
+ private final Object msg;
+
+ /** */
+ private boolean msgThread;
+
+ /** */
+ private final boolean skipRecovery;
+
+ /** */
+ private IgniteInClosure<IgniteException> ackC;
+
+ /**
+ * @param ses Session.
+ * @param msg Message.
+ * @param skipRecovery Skip recovery flag.
+ */
+ WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) {
+ this.ses = ses;
+ this.msg = msg;
+ this.skipRecovery = skipRecovery;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void messageThread(boolean msgThread) {
+ this.msgThread = msgThread;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean messageThread() {
+ return msgThread;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipRecovery() {
+ return skipRecovery;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+ ackC = c;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ assert msg instanceof Message;
+
+ ((Message)msg).onAckReceived();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInClosure<IgniteException> ackClosure() {
+ return ackC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(Exception e) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object message() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessageWritten() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resetSession(GridNioSession ses) {
+ this.ses = ses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridNioSession session() {
+ return ses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public NioOperation operation() {
+ return NioOperation.REQUIRE_WRITE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(WriteRequestImpl.class, this);
+ }
+ }
+
+ /**
* Class for requesting write and session close operations.
*/
- private static class NioOperationFuture<R> extends GridNioFutureImpl<R> {
+ private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest,
+ SessionChangeRequest {
/** */
private static final long serialVersionUID = 0L;
@@ -2087,11 +2743,7 @@ public class GridNioServer<T> {
private NioOperation op;
/** Message. */
- @GridToStringExclude
- private ByteBuffer msg;
-
- /** Direct message. */
- private Message commMsg;
+ private Object msg;
/** */
@GridToStringExclude
@@ -2153,8 +2805,7 @@ public class GridNioServer<T> {
* @param op Requested operation.
* @param msg Message.
*/
- NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
- ByteBuffer msg) {
+ NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) {
assert ses != null;
assert op != null;
assert op != NioOperation.REGISTER;
@@ -2182,51 +2833,36 @@ public class GridNioServer<T> {
this.ses = ses;
this.op = op;
- this.commMsg = commMsg;
+ this.msg = commMsg;
this.skipRecovery = skipRecovery;
}
- /**
- * @return Requested change operation.
- */
- private NioOperation operation() {
+ /** {@inheritDoc} */
+ public NioOperation operation() {
return op;
}
- /**
- * @return Message.
- */
- private ByteBuffer message() {
+ /** {@inheritDoc} */
+ public Object message() {
return msg;
}
- /**
- * @return Direct message.
- */
- private Message directMessage() {
- return commMsg;
- }
-
- /**
- * @param ses New session instance.
- */
- private void resetSession(GridSelectorNioSessionImpl ses) {
- assert commMsg != null;
+ /** {@inheritDoc} */
+ public void resetSession(GridNioSession ses) {
+ assert msg instanceof Message : msg;
- this.ses = ses;
+ this.ses = (GridSelectorNioSessionImpl)ses;
}
/**
* @return Socket channel for register request.
*/
- private SocketChannel socketChannel() {
+ SocketChannel socketChannel() {
return sockCh;
}
- /**
- * @return Session for this change request.
- */
- private GridSelectorNioSessionImpl session() {
+ /** {@inheritDoc} */
+ public GridSelectorNioSessionImpl session() {
return ses;
}
@@ -2244,21 +2880,21 @@ public class GridNioServer<T> {
return meta;
}
- /**
- * Applicable to write futures only. Fails future with corresponding IOException.
- */
- private void connectionClosed() {
- assert op == NioOperation.REQUIRE_WRITE;
- assert ses != null;
-
- onDone(new IOException("Failed to send message (connection was closed): " + ses));
+ /** {@inheritDoc} */
+ @Override public void onError(Exception e) {
+ onDone(e);
}
/** {@inheritDoc} */
@Override public void onAckReceived() {
- assert commMsg != null;
+ assert msg instanceof Message : msg;
- commMsg.onAckReceived();
+ ((Message)msg).onAckReceived();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessageWritten() {
+ onDone();
}
/** {@inheritDoc} */
@@ -2273,6 +2909,59 @@ public class GridNioServer<T> {
}
/**
+ *
+ */
+ private static class SessionMoveFuture extends NioOperationFuture<Boolean> {
+ /** */
+ private final int toIdx;
+
+ /** */
+ @GridToStringExclude
+ private SocketChannel movedSockCh;
+
+ /**
+ * @param ses Session.
+ * @param toIdx Target worker index.
+ */
+ SessionMoveFuture(
+ GridSelectorNioSessionImpl ses,
+ int toIdx
+ ) {
+ super(ses, NioOperation.MOVE);
+
+ this.toIdx = toIdx;
+ }
+
+ /**
+ * @return Target worker index.
+ */
+ int toIndex() {
+ return toIdx;
+ }
+
+ /**
+ * @return Moved session socket channel.
+ */
+ SocketChannel movedSocketChannel() {
+ return movedSockCh;
+ }
+
+ /**
+ * @param movedSockCh Moved session socket channel.
+ */
+ void movedSocketChannel(SocketChannel movedSockCh) {
+ assert movedSockCh != null;
+
+ this.movedSockCh = movedSockCh;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SessionMoveFuture.class, this, super.toString());
+ }
+ }
+
+ /**
* Filter forwarding messages from chain's head to this server.
*/
private class HeadFilter extends GridNioFilterAdapter {
@@ -2302,7 +2991,7 @@ public class GridNioServer<T> {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
if (directMode) {
boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
@@ -2313,18 +3002,18 @@ public class GridNioServer<T> {
queue.offer((ByteBuffer)msg);
- SelectionKey key = ((GridSelectorNioSessionImpl)ses).key();
+ GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
- if (key.isValid())
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ if (!ses0.procWrite.get() && ses0.procWrite.compareAndSet(false, true))
+ ses0.worker().registerWrite(ses0);
return null;
}
else
- return send(ses, (Message)msg);
+ return send(ses, (Message)msg, fut);
}
else
- return send(ses, (ByteBuffer)msg);
+ return send(ses, (ByteBuffer)msg, fut);
}
/** {@inheritDoc} */
@@ -2429,6 +3118,15 @@ public class GridNioServer<T> {
/** Message queue size listener. */
private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
+ /** Name for threads identification. */
+ private String srvName;
+
+ /** */
+ private long selectorSpins;
+
+ /** NIO sessions balancing flag. */
+ private boolean balancing;
+
/**
* Finishes building the instance.
*
@@ -2442,6 +3140,8 @@ public class GridNioServer<T> {
log,
selectorCnt,
gridName,
+ srvName,
+ selectorSpins,
tcpNoDelay,
directBuf,
byteOrder,
@@ -2455,6 +3155,7 @@ public class GridNioServer<T> {
writerFactory,
skipRecoveryPred,
msgQueueLsnr,
+ balancing,
filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
);
@@ -2468,6 +3169,16 @@ public class GridNioServer<T> {
}
/**
+ * @param balancing NIO sessions balancing flag.
+ * @return This for chaining.
+ */
+ public Builder<T> balancing(boolean balancing) {
+ this.balancing = balancing;
+
+ return this;
+ }
+
+ /**
* @param addr Local address.
* @return This for chaining.
*/
@@ -2519,6 +3230,28 @@ public class GridNioServer<T> {
}
/**
+ * @param srvName Logical server name for threads identification.
+ * @return This for chaining.
+ */
+ public Builder<T> serverName(@Nullable String srvName) {
+ this.srvName = srvName;
+
+ return this;
+ }
+
+ /**
+ * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ * @return This for chaining.
+ */
+ public Builder<T> selectorSpins(long selectorSpins) {
+ this.selectorSpins = selectorSpins;
+
+ return this;
+ }
+
+ /**
* @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
* @return This for chaining.
*/
@@ -2678,4 +3411,225 @@ public class GridNioServer<T> {
return this;
}
}
+
+ /**
+ *
+ */
+ private class SizeBasedBalancer implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long lastBalance;
+
+ /** */
+ private final long balancePeriod;
+
+ /**
+ * @param balancePeriod Period.
+ */
+ SizeBasedBalancer(long balancePeriod) {
+ this.balancePeriod = balancePeriod;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ long now = U.currentTimeMillis();
+
+ if (lastBalance + balancePeriod < now) {
+ lastBalance = now;
+
+ long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1;
+ int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1;
+
+ for (int i = 0; i < clientWorkers.size(); i++) {
+ GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+ int sesCnt = worker.workerSessions.size();
+
+ if (i % 2 == 0) {
+ // Reader.
+ long bytesRcvd0 = worker.bytesRcvd0;
+
+ if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 && sesCnt > 1) {
+ maxRcvd0 = bytesRcvd0;
+ maxRcvdIdx = i;
+ }
+
+ if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) {
+ minRcvd0 = bytesRcvd0;
+ minRcvdIdx = i;
+ }
+ }
+ else {
+ // Writer.
+ long bytesSent0 = worker.bytesSent0;
+
+ if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 && sesCnt > 1) {
+ maxSent0 = bytesSent0;
+ maxSentIdx = i;
+ }
+
+ if (minSent0 == -1 || bytesSent0 < minSent0) {
+ minSent0 = bytesSent0;
+ minSentIdx = i;
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx +
+ ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+ ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+ ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']');
+
+ if (maxSent0 != -1 && minSent0 != -1) {
+ GridSelectorNioSessionImpl ses = null;
+
+ long sentDiff = maxSent0 - minSent0;
+ long delta = sentDiff;
+ double threshold = sentDiff * 0.9;
+
+ GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+ clientWorkers.get(maxSentIdx).workerSessions;
+
+ for (GridSelectorNioSessionImpl ses0 : sessions) {
+ long bytesSent0 = ses0.bytesSent0();
+
+ if (bytesSent0 < threshold &&
+ (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff / 2))) {
+ ses = ses0;
+ delta = U.safeAbs(bytesSent0 - sentDiff / 2);
+ }
+ }
+
+ if (ses != null) {
+ if (log.isDebugEnabled())
+ log.debug("Will move session to less loaded writer [ses=" + ses +
+ ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+ moveSession(ses, maxSentIdx, minSentIdx);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Unable to find session to move for writers.");
+ }
+ }
+
+ if (maxRcvd0 != -1 && minRcvd0 != -1) {
+ GridSelectorNioSessionImpl ses = null;
+
+ long rcvdDiff = maxRcvd0 - minRcvd0;
+ long delta = rcvdDiff;
+ double threshold = rcvdDiff * 0.9;
+
+ GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+ clientWorkers.get(maxRcvdIdx).workerSessions;
+
+ for (GridSelectorNioSessionImpl ses0 : sessions) {
+ long bytesRcvd0 = ses0.bytesReceived0();
+
+ if (bytesRcvd0 < threshold &&
+ (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff / 2))) {
+ ses = ses0;
+ delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2);
+ }
+ }
+
+ if (ses != null) {
+ if (log.isDebugEnabled())
+ log.debug("Will move session to less loaded reader [ses=" + ses +
+ ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']');
+
+ moveSession(ses, maxRcvdIdx, minRcvdIdx);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Unable to find session to move for readers.");
+ }
+ }
+
+ for (int i = 0; i < clientWorkers.size(); i++) {
+ GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+ worker.reset0();
+ }
+ }
+ }
+ }
+
+ /**
+ * For tests only.
+ */
+ @SuppressWarnings("unchecked")
+ private class RandomBalancer implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int w1 = rnd.nextInt(clientWorkers.size());
+
+ if (clientWorkers.get(w1).workerSessions.isEmpty())
+ return;
+
+ int w2 = rnd.nextInt(clientWorkers.size());
+
+ while (w2 == w1)
+ w2 = rnd.nextInt(clientWorkers.size());
+
+ GridNioSession ses = randomSession(clientWorkers.get(w1));
+
+ if (ses != null) {
+ log.info("Move session [from=" + w1 +
+ ", to=" + w2 +
+ ", ses=" + ses + ']');
+
+ moveSession(ses, w1, w2);
+ }
+ }
+
+ /**
+ * @param worker Worker.
+ * @return NIO session.
+ */
+ private GridNioSession randomSession(GridNioServer.AbstractNioClientWorker worker) {
+ Collection<GridNioSession> sessions = worker.workerSessions;
+
+ int size = sessions.size();
+
+ if (size == 0)
+ return null;
+
+ int idx = ThreadLocalRandom.current().nextInt(size);
+
+ Iterator<GridNioSession> it = sessions.iterator();
+
+ int cnt = 0;
+
+ while (it.hasNext()) {
+ GridNioSession ses = it.next();
+
+ if (cnt == idx)
+ return ses;
+ }
+
+ return null;
+ }
+
+ }
+
+ /**
+ *
+ */
+ interface SessionChangeRequest {
+ GridNioSession session();
+
+ /**
+ * @return Requested change operation.
+ */
+ NioOperation operation();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index e4a7225..c1b60ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
+import org.apache.ignite.IgniteCheckedException;
import org.jetbrains.annotations.Nullable;
/**
@@ -105,6 +106,11 @@ public interface GridNioSession {
public GridNioFuture<?> send(Object msg);
/**
+ * @param msg Message to be sent.
+ */
+ public void sendNoFuture(Object msg) throws IgniteCheckedException;
+
+ /**
* Gets metadata associated with specified key.
*
* @param key Key to look up.
@@ -158,10 +164,25 @@ public interface GridNioSession {
/**
* @param recoveryDesc Recovery descriptor.
*/
- public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+ public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+ /**
+ * @param recoveryDesc Recovery descriptor.
+ */
+ public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
/**
* @return Recovery descriptor if recovery is supported, {@code null otherwise.}
*/
- @Nullable public GridNioRecoveryDescriptor recoveryDescriptor();
+ @Nullable public GridNioRecoveryDescriptor outRecoveryDescriptor();
+
+ /**
+ * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
+ */
+ @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor();
+
+ /**
+ * @param msg System message to send.
+ */
+ public void systemMessage(Object msg);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 0bcfe64..7424531 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -51,6 +51,12 @@ public class GridNioSessionImpl implements GridNioSession {
/** Received bytes counter. */
private volatile long bytesRcvd;
+ /** Sent bytes since last NIO sessions balancing. */
+ private volatile long bytesSent0;
+
+ /** Received bytes since last NIO sessions balancing. */
+ private volatile long bytesRcvd0;
+
/** Last send schedule timestamp. */
private volatile long sndSchedTime;
@@ -99,7 +105,7 @@ public class GridNioSessionImpl implements GridNioSession {
try {
resetSendScheduleTime();
- return chain().onSessionWrite(this, msg);
+ return chain().onSessionWrite(this, msg, true);
}
catch (IgniteCheckedException e) {
close();
@@ -109,6 +115,18 @@ public class GridNioSessionImpl implements GridNioSession {
}
/** {@inheritDoc} */
+ @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+ try {
+ chain().onSessionWrite(this, msg, false);
+ }
+ catch (IgniteCheckedException e) {
+ close();
+
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public GridNioFuture<?> resumeReads() {
try {
return chain().onResumeReads(this);
@@ -163,6 +181,28 @@ public class GridNioSessionImpl implements GridNioSession {
return bytesRcvd;
}
+ /**
+ * @return Sent bytes since last NIO sessions balancing.
+ */
+ public long bytesSent0() {
+ return bytesSent0;
+ }
+
+ /**
+ * @return Received bytes since last NIO sessions balancing.
+ */
+ public long bytesReceived0() {
+ return bytesRcvd0;
+ }
+
+ /**
+ *
+ */
+ public void reset0() {
+ bytesSent0 = 0;
+ bytesRcvd0 = 0;
+ }
+
/** {@inheritDoc} */
@Override public long createTime() {
return createTime;
@@ -240,6 +280,7 @@ public class GridNioSessionImpl implements GridNioSession {
*/
public void bytesSent(int cnt) {
bytesSent += cnt;
+ bytesSent0 += cnt;
lastSndTime = U.currentTimeMillis();
}
@@ -253,6 +294,7 @@ public class GridNioSessionImpl implements GridNioSession {
*/
public void bytesReceived(int cnt) {
bytesRcvd += cnt;
+ bytesRcvd0 += cnt;
lastRcvTime = U.currentTimeMillis();
}
@@ -296,17 +338,32 @@ public class GridNioSessionImpl implements GridNioSession {
}
/** {@inheritDoc} */
- @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
- @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+ @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
return null;
}
/** {@inheritDoc} */
+ @Override public void systemMessage(Object msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNioSessionImpl.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
new file mode 100644
index 0000000..62985ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import java.util.Collection;
+import java.util.List;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+interface GridNioWorker {
+ /**
+ * @param req Change request.
+ */
+ public void offer(GridNioServer.SessionChangeRequest req);
+
+ /**
+ * @param reqs Change requests.
+ */
+ public void offer(Collection<GridNioServer.SessionChangeRequest> reqs);
+
+ /**
+ * @param ses Session.
+ * @return Session state change requests.
+ */
+ @Nullable public List<GridNioServer.SessionChangeRequest> clearSessionRequests(GridNioSession ses);
+
+ /**
+ * @param ses Session to register write interest for.
+ */
+ public void registerWrite(GridSelectorNioSessionImpl ses);
+}