You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/22 14:42:46 UTC
[08/12] ignite git commit: conn
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81832e1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81832e1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81832e1d
Branch: refs/heads/ignite-comm-balance
Commit: 81832e1dc9576fee9c8f74c451f4893cd5633d99
Parents: ffd654a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 14:02:39 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 14:23:04 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/nio/GridNioServer.java | 131 ++++++++++++-------
.../ignite/internal/util/nio/GridNioWorker.java | 19 ++-
.../util/nio/GridSelectorNioSessionImpl.java | 62 ++++++++-
.../IgniteCommunicationBalanceTest.java | 2 +-
4 files changed, 161 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 5da557b..2d5cc64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -435,10 +435,6 @@ public class GridNioServer<T> {
NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
impl.offerStateChange(fut);
-// int idx = impl.selectorIndex(); // TODO
-//
-// if (idx != -1)
-// clientWorkers.get(idx).offer(fut);
return fut;
}
@@ -498,14 +494,9 @@ public class GridNioServer<T> {
if (ses.removeFuture(fut))
fut.connectionClosed();
}
- else if (msgCnt == 1) {
+ else if (msgCnt == 1)
// Change from 0 to 1 means that worker thread should be waken up.
-// int idx = ses.selectorIndex();
-//
-// if (idx != -1) // TODO revisit
-// clientWorkers.get(idx).offer(fut);
ses.offerStateChange(fut);
- }
if (msgQueueLsnr != null)
msgQueueLsnr.apply(ses, msgCnt);
@@ -578,7 +569,6 @@ public class GridNioServer<T> {
ses0.resend(futs);
// Wake up worker.
- //clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
ses0.offerStateChange(fut0);
}
}
@@ -598,6 +588,10 @@ public class GridNioServer<T> {
}
public void moveSession(GridNioSession ses, int from, int to) {
+ assert from >= 0 && from < clientWorkers.size() : from;
+ assert to >= 0 && to < clientWorkers.size() : to;
+ assert from != to;
+
clientWorkers.get(from).offer(new SessionMoveFuture((GridSelectorNioSessionImpl)ses, to));
}
@@ -1336,13 +1330,15 @@ public class GridNioServer<T> {
/** Worker index. */
private final int idx;
+ /** Sessions assigned to this worker. */
+ private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions =
+ new GridConcurrentHashSet<>();
+
private volatile long bytesRcvd;
private volatile long bytesSent;
private volatile long bytesRcvd0;
private volatile long bytesSent0;
- private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions0 = new GridConcurrentHashSet<>();
-
/**
* @param idx Index of this worker in server's array.
* @param gridName Grid name.
@@ -1360,7 +1356,7 @@ public class GridNioServer<T> {
}
public Collection<? extends GridNioSession> sessions() {
- return sessions0;
+ return workerSessions;
}
/** {@inheritDoc} */
@@ -1409,15 +1405,15 @@ public class GridNioServer<T> {
try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
- Class<?> selectorImplClass =
+ Class<?> selectorImplCls =
Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
// Ensure the current selector implementation is what we can instrument.
- if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+ if (!selectorImplCls.isAssignableFrom(selector.getClass()))
return;
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
+ Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys");
+ Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
@@ -1439,17 +1435,41 @@ public class GridNioServer<T> {
}
}
- /**
- * Adds socket channel to the registration queue and wakes up reading thread.
- *
- * @param req Change request.
- */
+ /** {@inheritDoc} */
@Override public void offer(GridNioFuture req) {
changeReqs.offer((NioOperationFuture)req);
selector.wakeup();
}
+ /** {@inheritDoc} */
+ @Override public void offer(Collection<GridNioFuture> reqs) {
+ for (GridNioFuture req : reqs)
+ changeReqs.offer((NioOperationFuture)req);
+
+ selector.wakeup();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<GridNioFuture> clearSessionRequests(GridNioSession ses) {
+ List<GridNioFuture> sesReqs = null;
+
+ for (GridNioServer.NioOperationFuture changeReq : changeReqs) {
+ if (changeReq.session() == ses) {
+ boolean rmv = changeReqs.remove(changeReq);
+
+ assert rmv : changeReq;
+
+ if (sesReqs == null)
+ sesReqs = new ArrayList<>();
+
+ sesReqs.add(changeReq);
+ }
+ }
+
+ return sesReqs;
+ }
+
/**
* Processes read and write events and registration requests.
*
@@ -1477,25 +1497,29 @@ public class GridNioServer<T> {
GridSelectorNioSessionImpl ses = f.session();
if (idx == f.toIdx) {
- ses.worker = this;
+ assert f.movedSocketChannel() != null : f;
+
+ boolean add = workerSessions.add(ses);
- sessions0.add(ses);
+ assert add;
- SelectionKey key = f.socketChannel().register(selector,
- SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); // TODO what if reads were paused?
+ ses.finishMoveSession(this);
+
+ SelectionKey key = f.movedSocketChannel().register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+ ses);
ses.key(key);
}
else {
- if (sessions0.remove(ses)) {
- assert ses.worker == this; // TODO replace with IF and ignore?
+ assert f.movedSocketChannel() == null : f;
- // Cleanup.
- ses.worker = null;
+ if (workerSessions.remove(ses)) {
+ ses.startMoveSession(this);
SelectionKey key = ses.key();
- f.socketChannel((SocketChannel)key.channel());
+ f.movedSocketChannel((SocketChannel)key.channel());
key.cancel();
@@ -1879,7 +1903,7 @@ public class GridNioServer<T> {
resend(ses);
sessions.add(ses);
- sessions0.add(ses);
+ workerSessions.add(ses);
try {
filterChain.onSessionOpened(ses);
@@ -1922,7 +1946,7 @@ public class GridNioServer<T> {
}
sessions.remove(ses);
- sessions0.remove(ses);
+ workerSessions.remove(ses);
SelectionKey key = ses.key();
@@ -2038,7 +2062,7 @@ public class GridNioServer<T> {
bytesSent0 = 0;
bytesRcvd0 = 0;
- for (GridSelectorNioSessionImpl ses : sessions0)
+ for (GridSelectorNioSessionImpl ses : workerSessions)
ses.reset0();
}
}
@@ -2248,7 +2272,7 @@ public class GridNioServer<T> {
/** Socket channel in register request. */
@GridToStringExclude
- protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy
+ private SocketChannel sockCh;
/** Session to perform operation on. */
@GridToStringExclude
@@ -2450,11 +2474,15 @@ public class GridNioServer<T> {
/** */
private final int toIdx;
+ /** */
+ @GridToStringExclude
+ private SocketChannel movedSockCh;
+
/**
- * @param ses
- * @param toIdx
+ * @param ses Session.
+ * @param toIdx Target worker index.
*/
- public SessionMoveFuture(
+ SessionMoveFuture(
GridSelectorNioSessionImpl ses,
int toIdx
) {
@@ -2463,12 +2491,25 @@ public class GridNioServer<T> {
this.toIdx = toIdx;
}
+ /**
+ * @return Target worker index.
+ */
int toIndex() {
return toIdx;
}
- void socketChannel(SocketChannel sockCh) {
- this.sockCh = sockCh;
+ /**
+ * @return Moved session socket channel.
+ */
+ SocketChannel movedSocketChannel() {
+ return movedSockCh;
+ }
+
+ /**
+ * @param movedSockCh Moved session socket channel.
+ */
+ void movedSocketChannel(SocketChannel movedSockCh) {
+ this.movedSockCh = movedSockCh;
}
/** {@inheritDoc} */
@@ -2938,7 +2979,7 @@ public class GridNioServer<T> {
long bytesRcvd0 = worker.bytesRcvd0;
if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 &&
- worker.sessions0.size() > 1) {
+ worker.workerSessions.size() > 1) {
maxRcvd0 = bytesRcvd0;
maxRcvdIdx = i;
@@ -2955,7 +2996,7 @@ public class GridNioServer<T> {
long bytesSent0 = worker.bytesSent0;
if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 &&
- worker.sessions0.size() > 1) {
+ worker.workerSessions.size() > 1) {
maxSent0 = bytesSent0;
maxSentIdx = i;
@@ -2989,7 +3030,7 @@ public class GridNioServer<T> {
double threshold = sentDiff * 0.9;
GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
- clientWorkers.get(maxSentIdx).sessions0;
+ clientWorkers.get(maxSentIdx).workerSessions;
for (GridSelectorNioSessionImpl ses0 : sessions) {
long bytesSent0 = ses0.bytesSent0();
@@ -3031,7 +3072,7 @@ public class GridNioServer<T> {
double threshold = rcvdDiff * 0.9;
GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
- clientWorkers.get(maxRcvdIdx).sessions0;
+ clientWorkers.get(maxRcvdIdx).workerSessions;
for (GridSelectorNioSessionImpl ses0 : sessions) {
long bytesRcvd0 = ses0.bytesReceived0();
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
index d088d8c..7f8033a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -17,12 +17,27 @@
package org.apache.ignite.internal.util.nio;
+import java.util.Collection;
+import java.util.List;
+import org.jetbrains.annotations.Nullable;
+
/**
*
*/
public interface GridNioWorker {
/**
- * @param fut Future.
+ * @param req Change request.
+ */
+ void offer(GridNioFuture req);
+
+ /**
+ * @param reqs Change requests.
+ */
+ void offer(Collection<GridNioFuture> reqs);
+
+ /**
+ * @param ses Session.
+ * @return Session state change requests.
*/
- void offer(GridNioFuture fut);
+ @Nullable List<GridNioFuture> clearSessionRequests(GridNioSession ses);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index e515696..0c2033b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteLogger;
@@ -43,8 +45,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
@GridToStringExclude
private SelectionKey key;
- /** */
- public GridNioWorker worker;
+ /** Current worker thread. */
+ private GridNioWorker worker;
/** Size counter. */
private final AtomicInteger queueSize = new AtomicInteger();
@@ -68,6 +70,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
/** Logger. */
private final IgniteLogger log;
+ /** */
+ private List<GridNioFuture> pendingStateChanges;
+
/**
* Creates session instance.
*
@@ -153,13 +158,60 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
return key;
}
+ /**
+ * @param fut
+ */
void offerStateChange(GridNioFuture fut) {
- GridNioWorker worker0 = worker;
+ synchronized (this) {
+ GridNioWorker worker0 = worker;
+
+ if (worker0 == null) {
+ if (pendingStateChanges == null)
+ pendingStateChanges = new ArrayList<>();
+
+ pendingStateChanges.add(fut);
+ }
+ else
+ worker0.offer(fut);
+ }
+ }
+
+ /**
+ * @param moveFrom
+ */
+ void startMoveSession(GridNioWorker moveFrom) {
+ synchronized (this) {
+ assert this.worker == moveFrom;
- if (worker0 != null)
- worker0.offer(fut);
+ List<GridNioFuture> sesReqs = moveFrom.clearSessionRequests(this);
+
+ worker = null;
+
+ if (sesReqs != null) {
+ if (pendingStateChanges == null)
+ pendingStateChanges = new ArrayList<>();
+
+ pendingStateChanges.addAll(sesReqs);
+ }
+ }
}
+ /**
+ * @param moveTo
+ */
+ void finishMoveSession(GridNioWorker moveTo) {
+ synchronized (this) {
+ assert worker == null;
+
+ worker = moveTo;
+
+ if (pendingStateChanges != null) {
+ moveTo.offer(pendingStateChanges);
+
+ pendingStateChanges = null;
+ }
+ }
+ }
/**
* Adds write future at the front of the queue without acquiring back pressure semaphore.
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index d523aab..839bd77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -147,7 +147,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
/**
*
*/
- static class DummyRunnable implements IgniteRunnable {
+ private static class DummyRunnable implements IgniteRunnable {
/** {@inheritDoc} */
@Override public void run() {
// No-op.