You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 15:26:37 UTC
[47/68] [abbrv] ignite git commit: ignite-comm-opts2
ignite-comm-opts2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d7d198e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d7d198e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d7d198e1
Branch: refs/heads/ignite-comm-opts2
Commit: d7d198e17302142f5ac7be3e2df88f0155fd2323
Parents: 0484196
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 26 15:50:05 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 26 15:50:05 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/nio/GridNioServer.java | 20 ++++++++++++++++----
.../util/nio/GridSelectorNioSessionImpl.java | 17 ++++++++++-------
2 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7d198e1/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 8ad7bde..b590d81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -455,9 +455,10 @@ public class GridNioServer<T> {
if (ses.removeFuture(fut))
fut.connectionClosed();
}
- else if (msgCnt == 1)
- // Change from 0 to 1 means that worker thread should be waken up.
- clientWorkers.get(ses.selectorIndex()).offer(fut);
+ else {
+ if (!ses.processWrite.get() && ses.processWrite.compareAndSet(false, true))
+ clientWorkers.get(ses.selectorIndex()).offer(fut);
+ }
if (msgQueueLsnr != null)
msgQueueLsnr.apply(ses, msgCnt);
@@ -1199,7 +1200,18 @@ public class GridNioServer<T> {
req = (NioOperationFuture<?>)ses.pollFuture();
if (req == null && buf.position() == 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ if (ses.processWrite.get()) {
+ boolean set = ses.processWrite.compareAndSet(true, false);
+
+ assert set;
+
+ if (ses.writeQueue().isEmpty()) {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+ else {
+ ses.processWrite.set(true);
+ }
+ }
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7d198e1/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 88721ff..ee494b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Collection;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -47,7 +48,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
private final int selectorIdx;
/** Size counter. */
- private final AtomicInteger queueSize = new AtomicInteger();
+ //private final AtomicInteger queueSize = new AtomicInteger();
/** Semaphore. */
@GridToStringExclude
@@ -68,6 +69,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
/** Logger. */
private final IgniteLogger log;
+ public final AtomicBoolean processWrite = new AtomicBoolean();
+
/**
* Creates session instance.
*
@@ -173,7 +176,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
assert res : "Future was not added to queue";
- return queueSize.incrementAndGet();
+ return 0;//queueSize.incrementAndGet();
}
/**
@@ -198,7 +201,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
assert res : "Future was not added to queue";
- return queueSize.incrementAndGet();
+ return 0;//queueSize.incrementAndGet();
}
/**
@@ -211,9 +214,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
assert add;
- boolean set = queueSize.compareAndSet(0, futs.size());
+ //boolean set = queueSize.compareAndSet(0, futs.size());
- assert set;
+ //assert set;
}
/**
@@ -223,7 +226,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
GridNioFuture<?> last = queue.poll();
if (last != null) {
- queueSize.decrementAndGet();
+ //queueSize.decrementAndGet();
if (sem != null && !last.messageThread())
sem.release();
@@ -264,7 +267,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
* @return Number of write requests.
*/
int writeQueueSize() {
- return queueSize.get();
+ return queue.sizex();//queueSize.get();
}
/**