You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:42 UTC

[19/50] [abbrv] ignite git commit: IGNITE-3054 - WIP

IGNITE-3054 - WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2274e0c2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2274e0c2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2274e0c2

Branch: refs/heads/ignite-3054
Commit: 2274e0c2b82b6a6fcdf2ffd6598636878cd77b08
Parents: 3b96fbe
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Dec 2 11:28:29 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Dec 2 11:28:29 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 48 ++++----------------
 1 file changed, 8 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2274e0c2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index de38709..0e0fd4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -208,9 +208,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Node ID in GridNioSession. */
     private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
 
-    /** Size of the send queue. */
-    private static final int MSG_QUEUE_SIZE_META = GridNioSessionMetaKey.nextUniqueKey();
-
     /**
      * Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
      * <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
@@ -427,7 +424,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         nioSem = new Semaphore(spi.getClientNioThreads());
 
-        clientNioSrv = createClientNioServer();
+        clientNioSrv = createClientNioServer(gridName);
         clientNioSrv.start();
 
         nioClientProcessingPool = new IgniteThreadPoolExecutor(
@@ -479,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * @return New NIO server.
      */
-    private GridNioServer<byte[]> createClientNioServer() {
+    private GridNioServer<byte[]> createClientNioServer(String gridName) {
         final GridNioServer<byte[]> srv;
 
         final ArrayList<GridNioFilter> filters = new ArrayList<>();
@@ -493,9 +490,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length) : ByteBuffer.wrap(msg0);
 
-                res.put(msg0);
+                if (directBuf) {
+                    res.put(msg0);
 
-                res.flip();
+                    res.flip();
+                }
 
                 return res;
             }
@@ -520,11 +519,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             final long writeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                 spi.getSocketTimeout();
 
-            String gridName = "client";
-
-            if (Thread.currentThread() instanceof IgniteThread)
-                gridName = ((IgniteThread)Thread.currentThread()).getGridName();
-
             srv = GridNioServer.<byte[]>builder().address(U.getLocalHost())
                 .port(-1)
                 .listener(clientLsnr)
@@ -542,11 +536,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 .gridName(gridName)
                 .daemon(false)
                 .writeTimeout(writeTimeout)
-                .messageQueueSizeListener(new IgniteBiInClosure<GridNioSession, Integer>() {
-                    @Override public void apply(GridNioSession ses, Integer size) {
-                        ses.addMeta(MSG_QUEUE_SIZE_META, size);
-                    }
-                })
                 .build();
         }
         catch (Exception e) {
@@ -5703,8 +5692,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (state == WorkerState.STOPPED)
                 return new GridFinishedFuture<>();
 
-            log.warning("== " + ses);
-
             final IgniteInternalFuture<Object> res = ses.close().chain(new C1<IgniteInternalFuture<Boolean>, Object>() {
                 @Override public Object apply(final IgniteInternalFuture<Boolean> fut) {
                     try {
@@ -5858,16 +5845,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             return state;
         }
 
-        /**
-         * @return Send queue size after adding message, in other words it may return 0
-         * only when no messages were sent after creation.
-         */
-        int queueSize() {
-            Integer size = ses.meta(MSG_QUEUE_SIZE_META);
-
-            return size == null ? 0 : size;
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(ClientNioMessageWorker.class, this);
@@ -7780,27 +7757,19 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            boolean skipWait = false;
-            final int batchSize = 50;
-
             while (!isCancelled()) {
                 for (final ClientNioMessageWorker worker : nioWorkers) {
                     if (worker.state == WorkerState.JOINED && worker.sending.compareAndSet(false, true)) {
                         T2<TcpDiscoveryAbstractMessage, byte[]> msg;
 
-                        if (worker.queueSize() < 2) {
-                            int cnt = batchSize;
-
                             do {
                                 msg = worker.msgQueue.poll();
 
                                 if (msg != null)
                                     worker.sendMessage(msg.get1(), msg.get2());
 
-                                skipWait = msg != null;
                             }
-                            while (msg != null && --cnt > 0);
-                        }
+                            while (msg != null);
 
                         boolean res = worker.sending.compareAndSet(true, false);
 
@@ -7808,8 +7777,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
 
-                if (!skipWait)
-                    nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
+                nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
             }
         }
     }