You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:38 UTC
[15/50] [abbrv] ignite git commit: IGNITE-3054 - Send batch
IGNITE-3054 - Send batch
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b96fbe7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b96fbe7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b96fbe7
Branch: refs/heads/ignite-3054
Commit: 3b96fbe7b0fbec03521c0573dbd0c359fff6d748
Parents: 3f71bc7
Author: dkarachentsev <dk...@gridgain.com>
Authored: Sun Nov 20 23:44:36 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Sun Nov 20 23:44:36 2016 +0300
----------------------------------------------------------------------
.../internal/util/nio/GridBufferedParser.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 50 ++++++++++++++++----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 4 +-
3 files changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b96fbe7/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index 303991d..954e70e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -38,7 +38,7 @@ public class GridBufferedParser implements GridNioParser {
private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
/** */
- private final boolean directBuf;
+ protected final boolean directBuf;
/** */
private final ByteOrder order; // TODO: GG-6460
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b96fbe7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e35b956..de38709 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -115,6 +115,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
@@ -207,6 +208,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Node ID in GridNioSession. */
private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Size of the send queue. */
+ private static final int MSG_QUEUE_SIZE_META = GridNioSessionMetaKey.nextUniqueKey();
+
/**
* Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
* <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
@@ -487,7 +491,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Respond to client without message length.
byte[] msg0 = (byte[])msg;
- ByteBuffer res = ByteBuffer.allocateDirect(msg0.length);
+ ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length) : ByteBuffer.wrap(msg0);
res.put(msg0);
@@ -538,6 +542,11 @@ class ServerImpl extends TcpDiscoveryImpl {
.gridName(gridName)
.daemon(false)
.writeTimeout(writeTimeout)
+ .messageQueueSizeListener(new IgniteBiInClosure<GridNioSession, Integer>() {
+ @Override public void apply(GridNioSession ses, Integer size) {
+ ses.addMeta(MSG_QUEUE_SIZE_META, size);
+ }
+ })
.build();
}
catch (Exception e) {
@@ -5667,7 +5676,7 @@ class ServerImpl extends TcpDiscoveryImpl {
meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ((NioSslSocket)sock).sslEngine);
}
- ses = (GridNioSession)clientNioSrv.createSession(ch, meta).get();
+ ses = clientNioSrv.createSession(ch, meta).get();
state = WorkerState.STARTED;
}
@@ -5694,6 +5703,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (state == WorkerState.STOPPED)
return new GridFinishedFuture<>();
+ log.warning("== " + ses);
+
final IgniteInternalFuture<Object> res = ses.close().chain(new C1<IgniteInternalFuture<Boolean>, Object>() {
@Override public Object apply(final IgniteInternalFuture<Boolean> fut) {
try {
@@ -5847,6 +5858,16 @@ class ServerImpl extends TcpDiscoveryImpl {
return state;
}
+ /**
+ * @return Send queue size after adding message, in other words it may return 0
+ * only when no messages were sent after creation.
+ */
+ int queueSize() {
+ Integer size = ses.meta(MSG_QUEUE_SIZE_META);
+
+ return size == null ? 0 : size;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientNioMessageWorker.class, this);
@@ -6060,7 +6081,7 @@ class ServerImpl extends TcpDiscoveryImpl {
ack.verify(locNodeId);
- clientMsgWrk.addMessage(ack, null);
+ clientMsgWrk.sendMessage(ack, null);
if (heartbeatMsg != null)
clientMsgWrk.metrics(heartbeatMsg.metrics());
@@ -7759,17 +7780,27 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ boolean skipWait = false;
+ final int batchSize = 50;
+
while (!isCancelled()) {
for (final ClientNioMessageWorker worker : nioWorkers) {
if (worker.state == WorkerState.JOINED && worker.sending.compareAndSet(false, true)) {
T2<TcpDiscoveryAbstractMessage, byte[]> msg;
- do {
- msg = worker.msgQueue.poll();
+ if (worker.queueSize() < 2) {
+ int cnt = batchSize;
+
+ do {
+ msg = worker.msgQueue.poll();
- if (msg != null)
- worker.sendMessage(msg.get1(), msg.get2());
- } while(msg != null);
+ if (msg != null)
+ worker.sendMessage(msg.get1(), msg.get2());
+
+ skipWait = msg != null;
+ }
+ while (msg != null && --cnt > 0);
+ }
boolean res = worker.sending.compareAndSet(true, false);
@@ -7777,7 +7808,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
+ if (!skipWait)
+ nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b96fbe7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 19e5249..b22e9c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -295,8 +295,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Default max number of messages that could be queued to send to client. (value is <tt>0</tt>). */
public static final int DFLT_CLIENT_SEND_MSG_QUEUE_LIMIT = 0;
- /** Default value for use direct or heap buffer. (value is <tt>true</tt>) */
- public static final boolean DFLT_CLIENT_NIO_DIRECT_BUF = true;
+ /** Default value for use direct or heap buffer. (value is <tt>false</tt>) */
+ public static final boolean DFLT_CLIENT_NIO_DIRECT_BUF = false;
/** Default byte order for nio client buffers. (value is <tt>ByteOrder.nativeOrder()</tt>) */
public static final ByteOrder DFLT_CLIENT_NIO_BYTE_ORDER = ByteOrder.nativeOrder();