You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 15:26:37 UTC

[47/68] [abbrv] ignite git commit: ignite-comm-opts2

ignite-comm-opts2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d7d198e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d7d198e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d7d198e1

Branch: refs/heads/ignite-comm-opts2
Commit: d7d198e17302142f5ac7be3e2df88f0155fd2323
Parents: 0484196
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 26 15:50:05 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 26 15:50:05 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 20 ++++++++++++++++----
 .../util/nio/GridSelectorNioSessionImpl.java    | 17 ++++++++++-------
 2 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d7d198e1/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 8ad7bde..b590d81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -455,9 +455,10 @@ public class GridNioServer<T> {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();
         }
-        else if (msgCnt == 1)
-            // Change from 0 to 1 means that worker thread should be waken up.
-            clientWorkers.get(ses.selectorIndex()).offer(fut);
+        else {
+            if (!ses.processWrite.get() && ses.processWrite.compareAndSet(false, true))
+                clientWorkers.get(ses.selectorIndex()).offer(fut);
+        }
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -1199,7 +1200,18 @@ public class GridNioServer<T> {
                 req = (NioOperationFuture<?>)ses.pollFuture();
 
                 if (req == null && buf.position() == 0) {
-                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                    if (ses.processWrite.get()) {
+                        boolean set = ses.processWrite.compareAndSet(true, false);
+
+                        assert set;
+
+                        if (ses.writeQueue().isEmpty()) {
+                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                        }
+                        else {
+                            ses.processWrite.set(true);
+                        }
+                    }
 
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7d198e1/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 88721ff..ee494b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.util.Collection;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -47,7 +48,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     private final int selectorIdx;
 
     /** Size counter. */
-    private final AtomicInteger queueSize = new AtomicInteger();
+    //private final AtomicInteger queueSize = new AtomicInteger();
 
     /** Semaphore. */
     @GridToStringExclude
@@ -68,6 +69,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Logger. */
     private final IgniteLogger log;
 
+    public final AtomicBoolean processWrite = new AtomicBoolean();
+
     /**
      * Creates session instance.
      *
@@ -173,7 +176,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert res : "Future was not added to queue";
 
-        return queueSize.incrementAndGet();
+        return 0;//queueSize.incrementAndGet();
     }
 
     /**
@@ -198,7 +201,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert res : "Future was not added to queue";
 
-        return queueSize.incrementAndGet();
+        return 0;//queueSize.incrementAndGet();
     }
 
     /**
@@ -211,9 +214,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert add;
 
-        boolean set = queueSize.compareAndSet(0, futs.size());
+        //boolean set = queueSize.compareAndSet(0, futs.size());
 
-        assert set;
+        //assert set;
     }
 
     /**
@@ -223,7 +226,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         GridNioFuture<?> last = queue.poll();
 
         if (last != null) {
-            queueSize.decrementAndGet();
+            //queueSize.decrementAndGet();
 
             if (sem != null && !last.messageThread())
                 sem.release();
@@ -264,7 +267,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @return Number of write requests.
      */
     int writeQueueSize() {
-        return queueSize.get();
+        return queue.sizex();//queueSize.get();
     }
 
     /**