You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:48 UTC
[26/50] [abbrv] ignite git commit: finalizing - removed
locksupport.park in NIO server, getters/setters/javadocs
finalizing - removed locksupport.park in NIO server, getters/setters/javadocs
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70ac3bee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70ac3bee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70ac3bee
Branch: refs/heads/ignite-comm-balance-master
Commit: 70ac3beeab3355f5fbbb190ddfed1a92d1d2aa17
Parents: 6adf8a9
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Nov 15 18:00:27 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Nov 15 18:00:27 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 13 --
.../util/nio/GridNioRecoveryDescriptor.java | 15 +-
.../ignite/internal/util/nio/GridNioServer.java | 211 +++++--------------
.../communication/tcp/TcpCommunicationSpi.java | 29 ++-
.../tcp/TcpCommunicationSpiMBean.java | 10 +
5 files changed, 89 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/70ac3bee/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 8a1f557..1c7dbdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -430,19 +430,6 @@ public final class IgniteSystemProperties {
public static final String IGNITE_NO_SELECTOR_OPTS = "IGNITE_NO_SELECTOR_OPTS";
/**
- * Defines how many non-blocking {@code selector.selectNow()} should be made before
- * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
- * Can be set to {@code Long.MAX_VALUE} so selector thread will never block.
- */
- public static final String IGNITE_SELECTOR_SPINS = "IGNITE_SELECTOR_SPINS";
-
- /**
- * When {@code true} writer workers in {@code GridNioServer} should use only {@code Selector.select()},
- * otherwise use {@code LockSupport.park()} when possible. Default is {@code false}.
- */
- public static final String IGNITE_DISABLE_SELECTOR_PARK = "IGNITE_DISABLE_SELECTOR_PARK";
-
- /**
* System property to specify period in milliseconds between calls of the SQL statements cache cleanup task.
* <p>
* Cleanup tasks clears cache for terminated threads and for threads which did not perform SQL queries within
http://git-wip-us.apache.org/repos/asf/ignite/blob/70ac3bee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 867237d..7a568ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -78,20 +78,15 @@ public class GridNioRecoveryDescriptor {
/** Number of descriptor reservations (for info purposes). */
private int reserveCnt;
- /** Ack send threshold (from configuration). */
- private final int ackSndThreshold;
-
/**
* @param queueLimit Maximum size of unacknowledged messages queue.
* @param node Node.
* @param log Logger.
- * @param ackSndThreshold Ack send threshold.
*/
public GridNioRecoveryDescriptor(
int queueLimit,
ClusterNode node,
- IgniteLogger log,
- int ackSndThreshold
+ IgniteLogger log
) {
assert !node.isLocal() : node;
assert queueLimit > 0;
@@ -101,14 +96,6 @@ public class GridNioRecoveryDescriptor {
this.queueLimit = queueLimit;
this.node = node;
this.log = log;
- this.ackSndThreshold = ackSndThreshold;
- }
-
- /**
- * @return Ack send threshold.
- */
- public int ackSendThreshold() {
- return ackSndThreshold;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/70ac3bee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 8c6c897..cb693b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -47,7 +47,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -130,13 +129,8 @@ public class GridNioServer<T> {
}
}
- /** */
- private final long selectorSpins =
- IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_SELECTOR_SPINS, 0);
-
- /** */
- private final boolean disablePark =
- IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_SELECTOR_PARK, true); // TODO
+ /** Defines how many times selector should do {@code selectNow()} before doing {@code select(long)}. */
+ private long selectorSpins;
/** Accept worker thread. */
@GridToStringExclude
@@ -240,6 +234,9 @@ public class GridNioServer<T> {
* @param selectorCnt Count of selectors and selecting threads.
* @param gridName Grid name.
* @param srvName Logical server name for threads identification.
+ * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
* @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
* @param directBuf Direct buffer flag.
* @param order Byte order.
@@ -263,6 +260,7 @@ public class GridNioServer<T> {
int selectorCnt,
@Nullable String gridName,
@Nullable String srvName,
+ long selectorSpins,
boolean tcpNoDelay,
boolean directBuf,
ByteOrder order,
@@ -299,6 +297,7 @@ public class GridNioServer<T> {
this.sockSndBuf = sockSndBuf;
this.sndQueueLimit = sndQueueLimit;
this.msgQueueLsnr = msgQueueLsnr;
+ this.selectorSpins = selectorSpins;
filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
@@ -441,13 +440,6 @@ public class GridNioServer<T> {
}
/**
- * @return {@code True} if park is disabled.
- */
- public boolean parkDisabled() {
- return disablePark;
- }
-
- /**
* @return Selector spins.
*/
public long selectorSpins() {
@@ -974,13 +966,8 @@ public class GridNioServer<T> {
ses.procWrite.set(false);
if (ses.writeQueue().isEmpty()) {
- if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- writeSesCnt--;
-
- assert writeSesCnt >= 0;
- }
}
else
ses.procWrite.set(true);
@@ -1069,17 +1056,6 @@ public class GridNioServer<T> {
return;
}
- int pendingAcks0 = -1;
-
- GridNioRecoveryDescriptor desc = null;
-
- if (!disablePark && writer) {
- desc = ((GridSelectorNioSessionImpl)key.attachment()).outRecoveryDescriptor();
-
- if (desc != null)
- pendingAcks0 = desc.messagesRequests().size() % desc.ackSendThreshold();
- }
-
ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
@@ -1131,12 +1107,6 @@ public class GridNioServer<T> {
catch (IgniteCheckedException e) {
close(ses, e);
}
-
- if (!disablePark && pendingAcks0 != -1) {
- assert desc != null;
-
- pendingAcks -= pendingAcks0 - (desc.messagesRequests().size() % desc.ackSendThreshold());
- }
}
/**
@@ -1146,27 +1116,10 @@ public class GridNioServer<T> {
* @throws IOException If write failed.
*/
@Override protected void processWrite(SelectionKey key) throws IOException {
- int pendingAcks0 = -1;
-
- GridNioRecoveryDescriptor desc = null;
-
- if (!disablePark && writer) {
- desc = ((GridSelectorNioSessionImpl)key.attachment()).outRecoveryDescriptor();
-
- if (desc != null)
- pendingAcks0 = desc.messagesRequests().size() % desc.ackSendThreshold();
- }
-
if (sslFilter != null)
processWriteSsl(key);
else
processWrite0(key);
-
- if (!disablePark && pendingAcks0 != -1) {
- assert desc != null;
-
- pendingAcks += (desc.messagesRequests().size() % desc.ackSendThreshold()) - pendingAcks0;
- }
}
/**
@@ -1232,17 +1185,12 @@ public class GridNioServer<T> {
req = ses.pollFuture();
if (req == null && buf.position() == 0) {
- if (!this.writer || ses.procWrite.get()) {
+ if (ses.procWrite.get()) {
ses.procWrite.set(false);
if (ses.writeQueue().isEmpty()) {
- if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- writeSesCnt--;
-
- assert writeSesCnt >= 0 : writeSesCnt;
- }
}
else
ses.procWrite.set(true);
@@ -1435,17 +1383,12 @@ public class GridNioServer<T> {
req = ses.pollFuture();
if (req == null && buf.position() == 0) {
- if (!this.writer || ses.procWrite.get()) {
+ if (ses.procWrite.get()) {
ses.procWrite.set(false);
if (ses.writeQueue().isEmpty()) {
- if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- writeSesCnt--;
-
- assert writeSesCnt >= 0 : writeSesCnt;
- }
}
else
ses.procWrite.set(true);
@@ -1571,21 +1514,9 @@ public class GridNioServer<T> {
private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions =
new GridConcurrentHashSet<>();
- /** Writer worker flag. */
- protected final boolean writer;
-
- /** {@code True} if calls 'selector.select'. */
+ /** {@code True} if worker has called or is about to call {@code Selector.select()}. */
private volatile boolean select;
- /** {@code True} if calls 'LockSupport.park'. */
- private volatile boolean park;
-
- /** Number of sessions which require writes. */
- protected int writeSesCnt;
-
- /** */
- protected int pendingAcks;
-
/**
* @param idx Index of this worker in server's array.
* @param gridName Grid name.
@@ -1600,8 +1531,6 @@ public class GridNioServer<T> {
createSelector();
this.idx = idx;
-
- writer = idx % 2 == 1;
}
/** {@inheritDoc} */
@@ -1690,8 +1619,6 @@ public class GridNioServer<T> {
if (select)
selector.wakeup();
- else if (!disablePark && park)
- LockSupport.unpark(clientThreads[idx]);
}
/** {@inheritDoc} */
@@ -1732,6 +1659,7 @@ public class GridNioServer<T> {
try {
long lastIdleCheck = U.currentTimeMillis();
+ mainLoop:
while (!closed && selector.isOpen()) {
SessionChangeRequest req0;
@@ -1766,9 +1694,6 @@ public class GridNioServer<T> {
SelectionKey.OP_READ | SelectionKey.OP_WRITE,
ses);
- // New session is registered with OP_WRITE interest.
- writeSesCnt++;
-
ses.key(key);
ses.procWrite.set(true);
@@ -1785,13 +1710,6 @@ public class GridNioServer<T> {
assert key.channel() != null : key;
- // Cancelling key with OP_WRITE interest - need to decrement counter.
- if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) {
- writeSesCnt--;
-
- assert writeSesCnt >= 0 : writeSesCnt;
- }
-
f.movedSocketChannel((SocketChannel)key.channel());
key.cancel();
@@ -1878,46 +1796,36 @@ public class GridNioServer<T> {
}
}
- if (!disablePark && writer && writeSesCnt == 0 && pendingAcks == 0) {
- park = true;
-
- try {
- while (changeReqs.isEmpty()) {
- LockSupport.parkNanos(1_000_000_000);
+ int res = 0;
- if (Thread.interrupted()) {
- Thread.currentThread().interrupt();
+ for (long i = 0; i < selectorSpins && res == 0; i++) {
+ res = selector.selectNow();
- return;
- }
- }
+ if (res > 0) {
+ // Walk through the ready keys collection and process network events.
+ if (selectedKeys == null)
+ processSelectedKeys(selector.selectedKeys());
+ else
+ processSelectedKeysOptimized(selectedKeys.flip());
+ }
- assert !changeReqs.isEmpty();
+ if (!changeReqs.isEmpty())
+ continue mainLoop;
- continue;
- }
- finally {
- park = false;
- }
- }
+ // Just in case we do busy selects.
+ long now = U.currentTimeMillis();
- int res = 0;
+ if (now - lastIdleCheck > 2000) {
+ lastIdleCheck = now;
- for (long i = 0; i < selectorSpins && res == 0; i++)
- res = selector.selectNow();
+ checkIdle(selector.keys());
+ }
- if (res > 0) {
- // Walk through the ready keys collection and process network events.
- if (selectedKeys == null)
- processSelectedKeys(selector.selectedKeys());
- else
- processSelectedKeysOptimized(selectedKeys.flip());
+ if (isCancelled())
+ return;
}
- if (!changeReqs.isEmpty() ||
- (!disablePark && writer && pendingAcks == 0 && selectorSpins != 0))
- continue;
-
+ // Falling to blocking select.
select = true;
try {
@@ -1981,11 +1889,9 @@ public class GridNioServer<T> {
SelectionKey key = ses.key();
if (key.isValid()) {
- if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) {
+ if ((key.interestOps() & SelectionKey.OP_WRITE) == 0)
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- writeSesCnt++;
- }
// Update timestamp to protected against false write timeout.
ses.bytesSent(0);
@@ -2160,26 +2066,6 @@ public class GridNioServer<T> {
}
/**
- * @return Always {@code true} to put it to assert statement.
- */
- protected boolean consistent() {
- int wsCnt = 0;
-
- for (SelectionKey key : selector.keys()) {
- boolean opWrite = key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
-
- if (opWrite)
- wsCnt++;
- }
-
- assert wsCnt == writeSesCnt : "Worker in illegal state [actualWriteSesCnt=" + wsCnt +
- ", calculatedWriteSesCnt=" + writeSesCnt +
- ", worker=" + this + ']';
-
- return true;
- }
-
- /**
* Checks sessions assigned to a selector for timeouts.
*
* @param keys Keys registered to selector.
@@ -2221,9 +2107,6 @@ public class GridNioServer<T> {
close(ses, e);
}
}
-
- // For test purposes only!
- assert consistent();
}
/**
@@ -2341,12 +2224,6 @@ public class GridNioServer<T> {
// Shutdown input and output so that remote client will see correct socket close.
Socket sock = ((SocketChannel)key.channel()).socket();
- if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
- writeSesCnt--;
-
- assert writeSesCnt >= 0 : writeSesCnt;
- }
-
try {
try {
sock.shutdownInput();
@@ -3243,6 +3120,9 @@ public class GridNioServer<T> {
/** Name for threads identification. */
private String srvName;
+ /** */
+ private long selectorSpins;
+
/**
* Finishes building the instance.
*
@@ -3257,6 +3137,7 @@ public class GridNioServer<T> {
selectorCnt,
gridName,
srvName,
+ selectorSpins,
tcpNoDelay,
directBuf,
byteOrder,
@@ -3344,6 +3225,18 @@ public class GridNioServer<T> {
}
/**
+ * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ * @return This for chaining.
+ */
+ public Builder<T> selectorSpins(long selectorSpins) {
+ this.selectorSpins = selectorSpins;
+
+ return this;
+ }
+
+ /**
* @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
* @return This for chaining.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/70ac3bee/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index dc2e216..57481fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1000,6 +1000,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Count of selectors to use in TCP server. */
private int selectorsCnt = DFLT_SELECTORS_CNT;
+ /**
+ * Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ */
+ private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L); // TODO
+
/** Address resolver. */
private AddressResolver addrRslvr;
@@ -1416,6 +1423,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return selectorsCnt;
}
+ /** {@inheritDoc} */
+ @Override public long getSelectorSpins() {
+ return selectorSpins;
+ }
+
+ /**
+ * Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ *
+ * @param selectorSpins Selector thread busy-loop iterations.
+ */
+ public void setSelectorSpins(long selectorSpins) {
+ this.selectorSpins = selectorSpins;
+ }
+
/**
* Sets value for {@code TCP_NODELAY} socket option. Each
* socket will be opened using provided value.
@@ -2010,6 +2033,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.directMode(true)
.metricsListener(metricsLsnr)
.writeTimeout(sockWriteTimeout)
+ .selectorSpins(selectorSpins)
.filters(filters)
.writerFactory(writerFactory)
.skipRecoveryPredicate(skipRecoveryPred)
@@ -2022,7 +2046,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isInfoEnabled())
log.info("Successfully bound communication NIO server to TCP port " +
"[port=" + boundTcpPort + ", locHost=" + locHost + ", selectorsCnt=" + selectorsCnt +
- ", parkDisabled=" + srvr.parkDisabled() + ", selectorSpins=" + srvr.selectorSpins() + ']');
+ ", selectorSpins=" + srvr.selectorSpins() + ']');
srvr.idleTimeout(idleConnTimeout);
@@ -3276,8 +3300,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
GridNioRecoveryDescriptor old =
- recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log,
- ackSndThreshold));
+ recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
if (old != null)
recovery = old;
http://git-wip-us.apache.org/repos/asf/ignite/blob/70ac3bee/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 482e2ef..871e8a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -153,6 +153,16 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public int getReconnectCount();
/**
+ * Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ *
+ * @return Selector thread busy-loop iterations.
+ */
+ @MXBeanDescription("Selector thread busy-loop iterations.")
+ public long getSelectorSpins();
+
+ /**
* Gets value for {@code TCP_NODELAY} socket option.
*
* @return {@code True} if TCP delay is disabled.