You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:28 UTC
[05/50] [abbrv] ignite git commit: IGNITE-3054 - Queue per worker
IGNITE-3054 - Queue per worker
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2c8ee968
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2c8ee968
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2c8ee968
Branch: refs/heads/ignite-3054
Commit: 2c8ee96899bd20b35be15be20e3346426d6acf37
Parents: 49ee23d
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Nov 11 16:15:10 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Nov 11 16:15:10 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 125 +++++++------------
1 file changed, 47 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2c8ee968/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e6d6323..2a471c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -57,10 +58,13 @@ import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
@@ -154,6 +158,7 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
@@ -270,6 +275,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Mutex. */
private final Object mux = new Object();
+ /** Nio sender semaphore. */
+ private Semaphore nioSem;
+
/** Discovery state. */
protected TcpDiscoverySpiState spiState = DISCONNECTED;
@@ -280,8 +288,8 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Nio server that serves client connections. */
private GridNioServer clientNioSrv;
- /** Client NIO messages. */
- private BlockingDeque<NioMessage> clientNioMsgQueue;
+ /** List of nio workers. */
+ private Set<ClientNioMessageWorker> nioWorkers = new CopyOnWriteArraySet<>();
/**
* @param adapter Adapter.
@@ -357,7 +365,7 @@ class ServerImpl extends TcpDiscoveryImpl {
tcpSrvr = new TcpServer();
- clientNioMsgQueue = new LinkedBlockingDeque<>();
+ nioSem = new Semaphore(spi.getClientNioThreads());
clientNioSrv = createClientNioServer();
clientNioSrv.start();
@@ -367,7 +375,7 @@ class ServerImpl extends TcpDiscoveryImpl {
60_000L, new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < spi.getClientNioThreads(); i++)
- nioClientProcessingPool.submit(new NioSendWorker(gridName, log, clientNioMsgQueue));
+ nioClientProcessingPool.submit(new NioSendWorker(gridName, log));
spi.initLocalNode(tcpSrvr.port, true);
@@ -558,6 +566,8 @@ class ServerImpl extends TcpDiscoveryImpl {
IgniteUtils.shutdownNow(ServerImpl.class, utilityPool, log);
IgniteUtils.shutdownNow(ServerImpl.class, nioClientProcessingPool, log);
+ nioSem.release(Integer.MAX_VALUE);
+
U.interrupt(statsPrinter);
U.join(statsPrinter, log);
@@ -5541,12 +5551,15 @@ class ServerImpl extends TcpDiscoveryImpl {
/** */
private volatile ClientMessagePinger pinger;
- /** Messages that added before this processor was properly initialized. */
- private volatile Queue<T2<TcpDiscoveryAbstractMessage, byte[]>> msgQueue;
+ /** Client message queue. */
+ private final Deque<T2<TcpDiscoveryAbstractMessage, byte[]>> msgQueue;
/** Worker state. */
private volatile WorkerState state = WorkerState.NOT_STARTED;
+ /** Indicates whether messages are sent to client. Only one thread can send messages to single client to keep their order. */
+ private final AtomicBoolean sending = new AtomicBoolean(false);
+
/**
* @param clientNodeId Client node ID.
* @param sock Socket.
@@ -5555,7 +5568,7 @@ class ServerImpl extends TcpDiscoveryImpl {
this.clientNodeId = clientNodeId;
this.sock = sock;
- msgQueue = new ArrayDeque<>();
+ msgQueue = new ConcurrentLinkedDeque8<>();
}
/**
@@ -5624,6 +5637,8 @@ class ServerImpl extends TcpDiscoveryImpl {
state = WorkerState.STOPPED;
+ nioWorkers.remove(this);
+
return res;
}
@@ -5632,21 +5647,11 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
void markJoinedAndSendPendingMessages() {
if (state == WorkerState.STARTED) {
- synchronized (this) {
- if (msgQueue != null) {
- // process all pending messages
- while (!msgQueue.isEmpty()) {
- final T2<TcpDiscoveryAbstractMessage, byte[]> addedMsg = msgQueue.poll();
+ state = WorkerState.JOINED;
- sendMessage(addedMsg.get1(), addedMsg.get2());
- }
+ nioWorkers.add(this);
- msgQueue = null;
- }
-
- if (state == WorkerState.STARTED)
- state = WorkerState.JOINED;
- }
+ nioSem.release();
}
}
@@ -5685,20 +5690,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (state0 == WorkerState.STOPPED)
return;
- if (state0 == WorkerState.NOT_STARTED || state0 == WorkerState.STARTED) {
- synchronized (this) {
- if (state == WorkerState.NOT_STARTED || state == WorkerState.STARTED) {
- msgQueue.add(new T2<>(msg, msgBytes));
-
- return;
- }
- }
- }
-
if (msg.highPriority())
- sendMessage(msg, msgBytes);
+ msgQueue.addFirst(new T2<>(msg, msgBytes));
else
- clientNioMsgQueue.add(new NioMessage(msg, msgBytes, this));
+ msgQueue.add(new T2<>(msg, msgBytes));
+
+ nioSem.release();
}
/**
@@ -7968,66 +7965,38 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- *
- */
- private static class NioMessage {
- /** */
- private final TcpDiscoveryAbstractMessage msg;
-
- /** */
- private final byte[] msgData;
-
- /** */
- private final ClientNioMessageWorker worker;
-
- /**
- * @param msg Discovery message to send.
- * @param msgData Message bytes to send.
- * @param worker Client NIO worker.
- */
- NioMessage(final TcpDiscoveryAbstractMessage msg, final byte[] msgData,
- final ClientNioMessageWorker worker) {
- this.msg = msg;
- this.msgData = msgData;
- this.worker = worker;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(NioMessage.class, this);
- }
- }
-
- /**
* Actually marshals client nio messages to release ring worker from that routine.
*/
- private static class NioSendWorker extends GridWorker {
- /** */
- private final BlockingDeque<NioMessage> msgQueue;
-
+ private class NioSendWorker extends GridWorker {
/**
* @param gridName Grid name.
* @param log Logger.
- * @param queue Message queue.
*/
- NioSendWorker(@Nullable final String gridName, final IgniteLogger log,
- final BlockingDeque<NioMessage> queue) {
+ NioSendWorker(@Nullable final String gridName, final IgniteLogger log) {
super(gridName, "nio-client-sender", log);
- msgQueue = queue;
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
- final NioMessage msg = msgQueue.poll(2000, TimeUnit.MILLISECONDS);
+ for (final ClientNioMessageWorker worker : nioWorkers) {
+ if (worker.state == WorkerState.JOINED && worker.sending.compareAndSet(false, true)) {
+ T2<TcpDiscoveryAbstractMessage, byte[]> msg;
- try {
- if (msg != null)
- msg.worker.sendMessage(msg.msg, msg.msgData);
- }
- catch (Exception e) {
- log.error("Failed to send message to client: [nioMsg=" + msg + ']', e);
+ do {
+ msg = worker.msgQueue.poll();
+
+ if (msg != null)
+ worker.sendMessage(msg.get1(), msg.get2());
+ } while(msg != null);
+
+ boolean res = worker.sending.compareAndSet(true, false);
+
+ assert res;
+ }
}
+
+ nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
}
}
}