You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:38 UTC

[15/50] [abbrv] ignite git commit: IGNITE-3054 - Send batch

IGNITE-3054 - Send batch


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b96fbe7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b96fbe7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b96fbe7

Branch: refs/heads/ignite-3054
Commit: 3b96fbe7b0fbec03521c0573dbd0c359fff6d748
Parents: 3f71bc7
Author: dkarachentsev <dk...@gridgain.com>
Authored: Sun Nov 20 23:44:36 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Sun Nov 20 23:44:36 2016 +0300

----------------------------------------------------------------------
 .../internal/util/nio/GridBufferedParser.java   |  2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 50 ++++++++++++++++----
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  4 +-
 3 files changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3b96fbe7/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index 303991d..954e70e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -38,7 +38,7 @@ public class GridBufferedParser implements GridNioParser {
     private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** */
-    private final boolean directBuf;
+    protected final boolean directBuf;
 
     /** */
     private final ByteOrder order; // TODO: GG-6460

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b96fbe7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e35b956..de38709 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -115,6 +115,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -207,6 +208,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Node ID in GridNioSession. */
     private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Size of the send queue. */
+    private static final int MSG_QUEUE_SIZE_META = GridNioSessionMetaKey.nextUniqueKey();
+
     /**
      * Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
      * <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
@@ -487,7 +491,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Respond to client without message length.
                 byte[] msg0 = (byte[])msg;
 
-                ByteBuffer res = ByteBuffer.allocateDirect(msg0.length);
+                ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length) : ByteBuffer.wrap(msg0);
 
                 res.put(msg0);
 
@@ -538,6 +542,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                 .gridName(gridName)
                 .daemon(false)
                 .writeTimeout(writeTimeout)
+                .messageQueueSizeListener(new IgniteBiInClosure<GridNioSession, Integer>() {
+                    @Override public void apply(GridNioSession ses, Integer size) {
+                        ses.addMeta(MSG_QUEUE_SIZE_META, size);
+                    }
+                })
                 .build();
         }
         catch (Exception e) {
@@ -5667,7 +5676,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ((NioSslSocket)sock).sslEngine);
             }
 
-            ses = (GridNioSession)clientNioSrv.createSession(ch, meta).get();
+            ses = clientNioSrv.createSession(ch, meta).get();
 
             state = WorkerState.STARTED;
         }
@@ -5694,6 +5703,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (state == WorkerState.STOPPED)
                 return new GridFinishedFuture<>();
 
+            log.warning("== " + ses);
+
             final IgniteInternalFuture<Object> res = ses.close().chain(new C1<IgniteInternalFuture<Boolean>, Object>() {
                 @Override public Object apply(final IgniteInternalFuture<Boolean> fut) {
                     try {
@@ -5847,6 +5858,16 @@ class ServerImpl extends TcpDiscoveryImpl {
             return state;
         }
 
+        /**
+         * @return Send queue size after adding message, in other words it may return 0
+         * only when no messages were sent after creation.
+         */
+        int queueSize() {
+            Integer size = ses.meta(MSG_QUEUE_SIZE_META);
+
+            return size == null ? 0 : size;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(ClientNioMessageWorker.class, this);
@@ -6060,7 +6081,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             ack.verify(locNodeId);
 
-            clientMsgWrk.addMessage(ack, null);
+            clientMsgWrk.sendMessage(ack, null);
 
             if (heartbeatMsg != null)
                 clientMsgWrk.metrics(heartbeatMsg.metrics());
@@ -7759,17 +7780,27 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            boolean skipWait = false;
+            final int batchSize = 50;
+
             while (!isCancelled()) {
                 for (final ClientNioMessageWorker worker : nioWorkers) {
                     if (worker.state == WorkerState.JOINED && worker.sending.compareAndSet(false, true)) {
                         T2<TcpDiscoveryAbstractMessage, byte[]> msg;
 
-                        do {
-                            msg = worker.msgQueue.poll();
+                        if (worker.queueSize() < 2) {
+                            int cnt = batchSize;
+
+                            do {
+                                msg = worker.msgQueue.poll();
 
-                            if (msg != null)
-                                worker.sendMessage(msg.get1(), msg.get2());
-                        } while(msg != null);
+                                if (msg != null)
+                                    worker.sendMessage(msg.get1(), msg.get2());
+
+                                skipWait = msg != null;
+                            }
+                            while (msg != null && --cnt > 0);
+                        }
 
                         boolean res = worker.sending.compareAndSet(true, false);
 
@@ -7777,7 +7808,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
 
-                nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
+                if (!skipWait)
+                    nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b96fbe7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 19e5249..b22e9c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -295,8 +295,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Default max number of messages that could be queued to send to client. (value is <tt>0</tt>).  */
     public static final int DFLT_CLIENT_SEND_MSG_QUEUE_LIMIT = 0;
 
-    /** Default value for use direct or heap buffer. (value is <tt>true</tt>) */
-    public static final boolean DFLT_CLIENT_NIO_DIRECT_BUF = true;
+    /** Default value for use direct or heap buffer. (value is <tt>false</tt>) */
+    public static final boolean DFLT_CLIENT_NIO_DIRECT_BUF = false;
 
     /** Default byte order for nio client buffers. (value is <tt>ByteOrder.nativeOrder()</tt>) */
     public static final ByteOrder DFLT_CLIENT_NIO_BYTE_ORDER = ByteOrder.nativeOrder();