You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:28 UTC

[05/50] [abbrv] ignite git commit: IGNITE-3054 - Queue per worker

IGNITE-3054 - Queue per worker


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2c8ee968
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2c8ee968
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2c8ee968

Branch: refs/heads/ignite-3054
Commit: 2c8ee96899bd20b35be15be20e3346426d6acf37
Parents: 49ee23d
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Nov 11 16:15:10 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Nov 11 16:15:10 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 125 +++++++------------
 1 file changed, 47 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2c8ee968/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e6d6323..2a471c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -57,10 +58,13 @@ import java.util.UUID;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
@@ -154,6 +158,7 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
@@ -270,6 +275,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Mutex. */
     private final Object mux = new Object();
 
+    /** Nio sender semaphore. */
+    private Semaphore nioSem;
+
     /** Discovery state. */
     protected TcpDiscoverySpiState spiState = DISCONNECTED;
 
@@ -280,8 +288,8 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Nio server that serves client connections. */
     private GridNioServer clientNioSrv;
 
-    /** Client NIO messages. */
-    private BlockingDeque<NioMessage> clientNioMsgQueue;
+    /** List of nio workers. */
+    private Set<ClientNioMessageWorker> nioWorkers = new CopyOnWriteArraySet<>();
 
     /**
      * @param adapter Adapter.
@@ -357,7 +365,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         tcpSrvr = new TcpServer();
 
-        clientNioMsgQueue = new LinkedBlockingDeque<>();
+        nioSem = new Semaphore(spi.getClientNioThreads());
 
         clientNioSrv = createClientNioServer();
         clientNioSrv.start();
@@ -367,7 +375,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             60_000L, new LinkedBlockingQueue<Runnable>());
 
         for (int i = 0; i < spi.getClientNioThreads(); i++)
-            nioClientProcessingPool.submit(new NioSendWorker(gridName, log, clientNioMsgQueue));
+            nioClientProcessingPool.submit(new NioSendWorker(gridName, log));
 
         spi.initLocalNode(tcpSrvr.port, true);
 
@@ -558,6 +566,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         IgniteUtils.shutdownNow(ServerImpl.class, utilityPool, log);
         IgniteUtils.shutdownNow(ServerImpl.class, nioClientProcessingPool, log);
 
+        nioSem.release(Integer.MAX_VALUE);
+
         U.interrupt(statsPrinter);
         U.join(statsPrinter, log);
 
@@ -5541,12 +5551,15 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** */
         private volatile ClientMessagePinger pinger;
 
-        /** Messages that added before this processor was properly initialized. */
-        private volatile Queue<T2<TcpDiscoveryAbstractMessage, byte[]>> msgQueue;
+        /** Client message queue. */
+        private final Deque<T2<TcpDiscoveryAbstractMessage, byte[]>> msgQueue;
 
         /** Worker state. */
         private volatile WorkerState state = WorkerState.NOT_STARTED;
 
+        /** Indicates whether messages are sent to client. Only one thread can send messages to single client to keep their order. */
+        private final AtomicBoolean sending = new AtomicBoolean(false);
+
         /**
          * @param clientNodeId Client node ID.
          * @param sock Socket.
@@ -5555,7 +5568,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.clientNodeId = clientNodeId;
             this.sock = sock;
 
-            msgQueue = new ArrayDeque<>();
+            msgQueue = new ConcurrentLinkedDeque8<>();
         }
 
         /**
@@ -5624,6 +5637,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             state = WorkerState.STOPPED;
 
+            nioWorkers.remove(this);
+
             return res;
         }
 
@@ -5632,21 +5647,11 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         void markJoinedAndSendPendingMessages() {
             if (state == WorkerState.STARTED) {
-                synchronized (this) {
-                    if (msgQueue != null) {
-                        // process all pending messages
-                        while (!msgQueue.isEmpty()) {
-                            final T2<TcpDiscoveryAbstractMessage, byte[]> addedMsg = msgQueue.poll();
+                state = WorkerState.JOINED;
 
-                            sendMessage(addedMsg.get1(), addedMsg.get2());
-                        }
+                nioWorkers.add(this);
 
-                        msgQueue = null;
-                    }
-
-                    if (state == WorkerState.STARTED)
-                        state = WorkerState.JOINED;
-                }
+                nioSem.release();
             }
         }
 
@@ -5685,20 +5690,12 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (state0 == WorkerState.STOPPED)
                 return;
 
-            if (state0 == WorkerState.NOT_STARTED || state0 == WorkerState.STARTED) {
-                synchronized (this) {
-                    if (state == WorkerState.NOT_STARTED || state == WorkerState.STARTED) {
-                        msgQueue.add(new T2<>(msg, msgBytes));
-
-                        return;
-                    }
-                }
-            }
-
             if (msg.highPriority())
-                sendMessage(msg, msgBytes);
+                msgQueue.addFirst(new T2<>(msg, msgBytes));
             else
-                clientNioMsgQueue.add(new NioMessage(msg, msgBytes, this));
+                msgQueue.add(new T2<>(msg, msgBytes));
+
+            nioSem.release();
         }
 
         /**
@@ -7968,66 +7965,38 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     *
-     */
-    private static class NioMessage {
-        /** */
-        private final TcpDiscoveryAbstractMessage msg;
-
-        /** */
-        private final byte[] msgData;
-
-        /** */
-        private final ClientNioMessageWorker worker;
-
-        /**
-         * @param msg Discovery message to send.
-         * @param msgData Message bytes to send.
-         * @param worker Client NIO worker.
-         */
-        NioMessage(final TcpDiscoveryAbstractMessage msg, final byte[] msgData,
-            final ClientNioMessageWorker worker) {
-            this.msg = msg;
-            this.msgData = msgData;
-            this.worker = worker;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(NioMessage.class, this);
-        }
-    }
-
-    /**
      * Actually marshals client nio messages to release ring worker from that routine.
      */
-    private static class NioSendWorker extends GridWorker {
-        /** */
-        private final BlockingDeque<NioMessage> msgQueue;
-
+    private class NioSendWorker extends GridWorker {
         /**
          * @param gridName Grid name.
          * @param log Logger.
-         * @param queue Message queue.
          */
-        NioSendWorker(@Nullable final String gridName, final IgniteLogger log,
-            final BlockingDeque<NioMessage> queue) {
+        NioSendWorker(@Nullable final String gridName, final IgniteLogger log) {
             super(gridName, "nio-client-sender", log);
-            msgQueue = queue;
         }
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             while (!isCancelled()) {
-                final NioMessage msg = msgQueue.poll(2000, TimeUnit.MILLISECONDS);
+                for (final ClientNioMessageWorker worker : nioWorkers) {
+                    if (worker.state == WorkerState.JOINED && worker.sending.compareAndSet(false, true)) {
+                        T2<TcpDiscoveryAbstractMessage, byte[]> msg;
 
-                try {
-                    if (msg != null)
-                        msg.worker.sendMessage(msg.msg, msg.msgData);
-                }
-                catch (Exception e) {
-                    log.error("Failed to send message to client: [nioMsg=" + msg + ']', e);
+                        do {
+                            msg = worker.msgQueue.poll();
+
+                            if (msg != null)
+                                worker.sendMessage(msg.get1(), msg.get2());
+                        } while(msg != null);
+
+                        boolean res = worker.sending.compareAndSet(true, false);
+
+                        assert res;
+                    }
                 }
+
+                nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
             }
         }
     }