You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/13 11:15:20 UTC

[2/2] ignite git commit: Ignite-perftest - Prepare optimizations for merge.

Ignite-perftest - Prepare optimizations for merge.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e58604a4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e58604a4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e58604a4

Branch: refs/heads/ignite-perftest-merge
Commit: e58604a4aa6c2c0ec9a756ac40a5aae4af5621bc
Parents: d12674a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Nov 13 13:15:05 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Nov 13 13:15:05 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalGatewayImpl.java  |   8 +-
 .../managers/communication/GridIoManager.java   |  52 +-
 .../processors/cache/GridCacheGateway.java      |  25 +-
 .../processors/cache/GridCacheIoManager.java    |  34 +-
 .../processors/cache/GridCacheMapEntry.java     |  16 +-
 .../processors/cache/GridCacheMvcc.java         |   8 -
 .../distributed/dht/GridDhtLockFuture.java      |   3 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   3 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |   2 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   3 +-
 ...arOptimisticSerializableTxPrepareFuture.java |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java |  12 -
 .../clock/GridClockSyncProcessor.java           |   2 +-
 .../internal/util/GridSpinReadWriteLock.java    | 522 +++++++++----------
 .../ignite/internal/util/nio/GridNioServer.java |  12 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  42 +-
 16 files changed, 356 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index dbf2f73..fe8c580 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -73,13 +73,13 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
         if (stackTrace == null)
             stackTrace = stackTrace();
 
-//        rwLock.readLock();
+        rwLock.readLock();
 
         GridKernalState state = this.state.get();
 
         if (state != GridKernalState.STARTED) {
             // Unlock just acquired lock.
-//            rwLock.readUnlock();
+            rwLock.readUnlock();
 
             if (state == GridKernalState.DISCONNECTED) {
                 assert reconnectFut != null;
@@ -96,7 +96,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
         if (stackTrace == null)
             stackTrace = stackTrace();
 
-//        rwLock.readLock();
+        rwLock.readLock();
 
         if (state.get() == GridKernalState.DISCONNECTED)
             throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
@@ -104,7 +104,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
 
     /** {@inheritDoc} */
     @Override public void readUnlock() {
-//        rwLock.readUnlock();
+        rwLock.readUnlock();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a14a05a..b8af8da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -167,10 +167,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final Marshaller marsh;
 
     /** Busy lock. */
-//    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
 
     /** Lock to sync maps access. */
-//    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     /** Fully started flag. When set to true, can send and receive messages. */
     private volatile boolean started;
@@ -396,7 +396,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                         }
 
                         // Clean up delayed and ordered messages (need exclusive lock).
-//                        lock.writeLock().lock();
+                        lock.writeLock().lock();
 
                         try {
                             ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId);
@@ -406,7 +406,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                                     "(sender node left topology): " + waitList);
                         }
                         finally {
-//                            lock.writeLock().unlock();
+                            lock.writeLock().unlock();
                         }
 
                         break;
@@ -424,7 +424,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         // 1. Process wait list.
         Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>();
 
-//        lock.writeLock().lock();
+        lock.writeLock().lock();
 
         try {
             started = true;
@@ -442,7 +442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         }
         finally {
-//            lock.writeLock().unlock();
+            lock.writeLock().unlock();
         }
 
         // After write lock released.
@@ -501,19 +501,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         boolean interrupted = false;
 
         // Busy wait is intentional.
-//        while (true) {
-//            try {
-//                if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
-//                    break;
-//                else
-//                    Thread.sleep(200);
-//            }
-//            catch (InterruptedException ignore) {
-//                // Preserve interrupt status & ignore.
-//                // Note that interrupted flag is cleared.
-//                interrupted = true;
-//            }
-//        }
+        while (true) {
+            try {
+                if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
+                    break;
+                else
+                    Thread.sleep(200);
+            }
+            catch (InterruptedException ignore) {
+                // Preserve interrupt status & ignore.
+                // Note that interrupted flag is cleared.
+                interrupted = true;
+            }
+        }
 
         try {
             if (interrupted)
@@ -529,7 +529,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             stopping = true;
         }
         finally {
-//            busyLock.writeUnlock();
+            busyLock.writeUnlock();
         }
     }
 
@@ -553,7 +553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         assert nodeId != null;
         assert msg != null;
 
-//        busyLock.readLock();
+        busyLock.readLock();
 
         try {
             if (stopping) {
@@ -581,7 +581,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
 
             if (!started) {
-//                lock.readLock().lock();
+                lock.readLock().lock();
 
                 try {
                     if (!started) { // Sets to true in write lock, so double checking.
@@ -601,7 +601,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     }
                 }
                 finally {
-//                    lock.readLock().unlock();
+                    lock.readLock().unlock();
                 }
             }
 
@@ -649,7 +649,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             U.error(log, "Failed to process message (will ignore): " + msg, e);
         }
         finally {
-//            busyLock.readUnlock();
+            busyLock.readUnlock();
         }
     }
 
@@ -2001,7 +2001,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return;
             }
 
-//            busyLock.readLock();
+            busyLock.readLock();
 
             try {
                 if (stopping) {
@@ -2077,7 +2077,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 }
             }
             finally {
-//                busyLock.readUnlock();
+                busyLock.readUnlock();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 8a1f0c3..1562d70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -63,9 +63,9 @@ public class GridCacheGateway<K, V> {
         if (ctx.deploymentEnabled())
             ctx.deploy().onEnter();
 
-//        rwLock.readLock();
+        rwLock.readLock();
 
-//        checkState(true, true);
+        checkState(true, true);
     }
 
     /**
@@ -106,11 +106,9 @@ public class GridCacheGateway<K, V> {
         onEnter();
 
         // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
-//        rwLock.readLock();
-//
-//        return checkState(true, false);
+        rwLock.readLock();
 
-        return true;
+        return checkState(true, false);
     }
 
     /**
@@ -121,8 +119,7 @@ public class GridCacheGateway<K, V> {
     public boolean enterIfNotStoppedNoLock() {
         onEnter();
 
-//        return checkState(false, false);
-        return true;
+        return checkState(false, false);
     }
 
     /**
@@ -145,7 +142,7 @@ public class GridCacheGateway<K, V> {
            leaveNoLock();
         }
         finally {
-//            rwLock.readUnlock();
+            rwLock.readUnlock();
         }
     }
 
@@ -171,9 +168,9 @@ public class GridCacheGateway<K, V> {
 
         onEnter();
 
-//        rwLock.readLock();
-//
-//        checkState(true, true);
+        rwLock.readLock();
+
+        checkState(true, true);
 
         // Must unlock in case of unexpected errors to avoid
         // deadlocks during kernal stop.
@@ -181,7 +178,7 @@ public class GridCacheGateway<K, V> {
             return setOperationContextPerCall(opCtx);
         }
         catch (Throwable e) {
-//            rwLock.readUnlock();
+            rwLock.readUnlock();
 
             throw e;
         }
@@ -222,7 +219,7 @@ public class GridCacheGateway<K, V> {
             leaveNoLock(prev);
         }
         finally {
-//            rwLock.readUnlock();
+            rwLock.readUnlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 1a118a7..2334780 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -101,7 +101,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     private boolean stopping;
 
     /** Mutex. */
-//    private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+    private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
 
     /** Deployment enabled. */
     private boolean depEnabled;
@@ -218,19 +218,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         boolean interrupted = false;
 
         // Busy wait is intentional.
-//        while (true) {
-//            try {
-//                if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
-//                    break;
-//                else
-//                    Thread.sleep(200);
-//            }
-//            catch (InterruptedException ignore) {
-//                // Preserve interrupt status & ignore.
-//                // Note that interrupted flag is cleared.
-//                interrupted = true;
-//            }
-//        }
+        while (true) {
+            try {
+                if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
+                    break;
+                else
+                    Thread.sleep(200);
+            }
+            catch (InterruptedException ignore) {
+                // Preserve interrupt status & ignore.
+                // Note that interrupted flag is cleared.
+                interrupted = true;
+            }
+        }
 
         if (interrupted)
             Thread.currentThread().interrupt();
@@ -239,7 +239,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             stopping = true;
         }
         finally {
-//            rw.writeUnlock();
+            rw.writeUnlock();
         }
     }
 
@@ -251,7 +251,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
         final IgniteBiInClosure<UUID, GridCacheMessage> c) {
-//        rw.readLock();
+        rw.readLock();
 
         try {
             if (stopping) {
@@ -282,7 +282,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (depEnabled)
                 cctx.deploy().ignoreOwnership(false);
 
-//            rw.readUnlock();
+            rw.readUnlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b40ab6a..df9f5c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -298,17 +298,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         CacheObject val0 = val;
 
-        if (val0 == null) {
-            if (hasOffHeapPointer()) {
-                IgniteBiTuple<byte[], Byte> t = valueBytes0();
+        if (val0 == null && hasOffHeapPointer()) {
+            IgniteBiTuple<byte[], Byte> t = valueBytes0();
 
-                return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
-            }
-        }
-        else if (val0 instanceof CacheObjectImpl) {
-            CacheObjectImpl im = (CacheObjectImpl)val0;
-
-            val0 = new CacheObjectImpl(im.val, im.valBytes);
+            return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
         }
 
         return val0;
@@ -2854,8 +2847,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
     /** {@inheritDoc} */
     @Override public KeyCacheObject key() {
-//        return key;
-        return new KeyCacheObjectImpl(((KeyCacheObjectImpl)key).val, ((KeyCacheObjectImpl)key).valBytes);
+        return key;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 543923a..adcbf92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -1370,12 +1370,4 @@ public final class GridCacheMvcc {
     @Override public String toString() { // Synchronize to ensure one-thread at a time.
         return S.toString(GridCacheMvcc.class, this);
     }
-
-    public static void main(String[] args) {
-        ArrayList<String> col1 = new ArrayList<>(5);
-
-        for (int i = 0; i < 5; i++) {
-            col1.add("" + i);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index d86a11d..219d841 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -380,10 +380,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      * @return Lock candidate.
      * @throws GridCacheEntryRemovedException If entry was removed.
      * @throws GridDistributedLockCancelledException If lock is canceled.
-     * @throws IgniteCheckedException If failed.
      */
     @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry)
-        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException {
+        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
         if (log.isDebugEnabled())
             log.debug("Adding entry: " + entry);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0afe70b..6bd283a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -686,9 +686,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
 
-        if (true)
-            return affNodes;
-
         lock.readLock().lock();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index dbe69f8..e2939b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -985,7 +985,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
             }
         }
-        catch (GridCacheEntryRemovedException e) {
+        catch (GridCacheEntryRemovedException ignore) {
             assert false : "Got removed exception on entry with dht local candidate: " + entries;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 1050086..e268a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import javax.cache.processor.EntryProcessor;
@@ -209,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         this.rmtFutId = rmtFutId;
 
         readMap = Collections.emptyMap();
-        writeMap = new LinkedHashMap<>(U.capacity(txSize), 0.75f);
+        writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
 
         topologyVersion(topVer);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 1ca90dd..9c6cb88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -389,9 +389,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
-            MiniFuture fut = new MiniFuture(m);
-
-            add(fut);
+            add(new MiniFuture(m));
         }
 
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 278f6df..c88546b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -482,18 +482,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /**
-     * @param nodeId Node ID.
-     * @param dhtVer DHT version.
-     * @param writeVer Write version.
-     */
-    void addDhtVersion(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion writeVer) {
-        GridDistributedTxMapping m = mappings.get(nodeId);
-
-        if (m != null)
-            m.dhtVersion(dhtVer, writeVer);
-    }
-
-    /**
      * @param nodeId Undo mapping.
      */
     @Override public boolean removeMapping(UUID nodeId) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index a33c35e..b5c89cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -289,7 +289,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
 
         long now = clockSrc.currentTimeMillis();
 
-        if (snap  == null)
+        if (snap == null)
             return now;
 
         Long delta = snap.deltas().get(ctx.localNodeId());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 115fd80..a1fa892 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -78,44 +78,44 @@ public class GridSpinReadWriteLock {
      */
     @SuppressWarnings("BusyWait")
     public void readLock() {
-//        int cnt = readLockEntryCnt.get();
-//
-//        // Read lock reentry or acquiring read lock while holding write lock.
-//        if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
-//            assert state > 0 || state == -1;
-//
-//            readLockEntryCnt.set(cnt + 1);
-//
-//            return;
-//        }
-//
-//        boolean interrupted = false;
-//
-//        while (true) {
-//            int cur = state;
-//
-//            assert cur >= -1;
-//
-//            if (cur == -1 || pendingWLocks > 0) {
-//                try {
-//                    Thread.sleep(10);
-//                }
-//                catch (InterruptedException ignored) {
-//                    interrupted = true;
-//                }
-//
-//                continue;
-//            }
-//
-//            if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
-//                if (interrupted)
-//                    Thread.currentThread().interrupt();
-//
-//                break;
-//            }
-//        }
-//
-//        readLockEntryCnt.set(1);
+        int cnt = readLockEntryCnt.get();
+
+        // Read lock reentry or acquiring read lock while holding write lock.
+        if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+            assert state > 0 || state == -1;
+
+            readLockEntryCnt.set(cnt + 1);
+
+            return;
+        }
+
+        boolean interrupted = false;
+
+        while (true) {
+            int cur = state;
+
+            assert cur >= -1;
+
+            if (cur == -1 || pendingWLocks > 0) {
+                try {
+                    Thread.sleep(10);
+                }
+                catch (InterruptedException ignored) {
+                    interrupted = true;
+                }
+
+                continue;
+            }
+
+            if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+                if (interrupted)
+                    Thread.currentThread().interrupt();
+
+                break;
+            }
+        }
+
+        readLockEntryCnt.set(1);
     }
 
     /**
@@ -124,62 +124,60 @@ public class GridSpinReadWriteLock {
      * @return {@code true} if acquired.
      */
     public boolean tryReadLock() {
-//        int cnt = readLockEntryCnt.get();
-//
-//        // Read lock reentry or acquiring read lock while holding write lock.
-//        if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
-//            assert state > 0 || state == -1;
-//
-//            readLockEntryCnt.set(cnt + 1);
-//
-//            return true;
-//        }
-//
-//        while (true) {
-//            int cur = state;
-//
-//            if (cur == -1 || pendingWLocks > 0)
-//                return false;
-//
-//            if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
-//                readLockEntryCnt.set(1);
-//
-//                return true;
-//            }
-//        }
-
-        return true;
+        int cnt = readLockEntryCnt.get();
+
+        // Read lock reentry or acquiring read lock while holding write lock.
+        if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+            assert state > 0 || state == -1;
+
+            readLockEntryCnt.set(cnt + 1);
+
+            return true;
+        }
+
+        while (true) {
+            int cur = state;
+
+            if (cur == -1 || pendingWLocks > 0)
+                return false;
+
+            if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+                readLockEntryCnt.set(1);
+
+                return true;
+            }
+        }
     }
 
     /**
      * Read unlock.
      */
     public void readUnlock() {
-//        int cnt = readLockEntryCnt.get();
-//
-//        if (cnt == 0)
-//            throw new IllegalMonitorStateException();
-//
-//        // Read unlock when holding write lock is performed here.
-//        if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) {
-//            assert state > 0 || state == -1;
-//
-//            readLockEntryCnt.set(cnt - 1);
-//
-//            return;
-//        }
-//
-//        while (true) {
-//            int cur = state;
-//
-//            assert cur > 0;
-//
-//            if (compareAndSet(STATE_OFFS, cur, cur - 1)) {
-//                readLockEntryCnt.set(0);
-//
-//                return;
-//            }
-//        }
+        int cnt = readLockEntryCnt.get();
+
+        if (cnt == 0)
+            throw new IllegalMonitorStateException();
+
+        // Read unlock when holding write lock is performed here.
+        if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) {
+            assert state > 0 || state == -1;
+
+            readLockEntryCnt.set(cnt - 1);
+
+            return;
+        }
+
+        while (true) {
+            int cur = state;
+
+            assert cur > 0;
+
+            if (compareAndSet(STATE_OFFS, cur, cur - 1)) {
+                readLockEntryCnt.set(0);
+
+                return;
+            }
+        }
     }
 
     /**
@@ -187,95 +185,95 @@ public class GridSpinReadWriteLock {
      */
     @SuppressWarnings("BusyWait")
     public void writeLock() {
-//        long threadId = Thread.currentThread().getId();
-//
-//        if (threadId == writeLockOwner) {
-//            assert state == -1;
-//
-//            writeLockEntryCnt++;
-//
-//            return;
-//        }
-//
-//        // Increment pending write locks.
-//        while (true) {
-//            int pendingWLocks0 = pendingWLocks;
-//
-//            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
-//                break;
-//        }
-//
-//        boolean interrupted = false;
-//
-//        while (!compareAndSet(STATE_OFFS, 0, -1)) {
-//            try {
-//                Thread.sleep(10);
-//            }
-//            catch (InterruptedException ignored) {
-//                interrupted = true;
-//            }
-//        }
-//
-//        // Decrement pending write locks.
-//        while (true) {
-//            int pendingWLocks0 = pendingWLocks;
-//
-//            assert pendingWLocks0 > 0;
-//
-//            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
-//                break;
-//        }
-//
-//        if (interrupted)
-//            Thread.currentThread().interrupt();
-//
-//        assert writeLockOwner == -1;
-//
-//        writeLockOwner = threadId;
-//        writeLockEntryCnt = 1;
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return;
+        }
+
+        // Increment pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+                break;
+        }
+
+        boolean interrupted = false;
+
+        while (!compareAndSet(STATE_OFFS, 0, -1)) {
+            try {
+                Thread.sleep(10);
+            }
+            catch (InterruptedException ignored) {
+                interrupted = true;
+            }
+        }
+
+        // Decrement pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            assert pendingWLocks0 > 0;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+                break;
+        }
+
+        if (interrupted)
+            Thread.currentThread().interrupt();
+
+        assert writeLockOwner == -1;
+
+        writeLockOwner = threadId;
+        writeLockEntryCnt = 1;
     }
 
     /**
      * Acquires write lock without sleeping between unsuccessful attempts.
      */
     public void writeLock0() {
-//        long threadId = Thread.currentThread().getId();
-//
-//        if (threadId == writeLockOwner) {
-//            assert state == -1;
-//
-//            writeLockEntryCnt++;
-//
-//            return;
-//        }
-//
-//        // Increment pending write locks.
-//        while (true) {
-//            int pendingWLocks0 = pendingWLocks;
-//
-//            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
-//                break;
-//        }
-//
-//        for (;;) {
-//            if (compareAndSet(STATE_OFFS, 0, -1))
-//                break;
-//        }
-//
-//        // Decrement pending write locks.
-//        while (true) {
-//            int pendingWLocks0 = pendingWLocks;
-//
-//            assert pendingWLocks0 > 0;
-//
-//            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
-//                break;
-//        }
-//
-//        assert writeLockOwner == -1;
-//
-//        writeLockOwner = threadId;
-//        writeLockEntryCnt = 1;
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return;
+        }
+
+        // Increment pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+                break;
+        }
+
+        for (;;) {
+            if (compareAndSet(STATE_OFFS, 0, -1))
+                break;
+        }
+
+        // Decrement pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            assert pendingWLocks0 > 0;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+                break;
+        }
+
+        assert writeLockOwner == -1;
+
+        writeLockOwner = threadId;
+        writeLockEntryCnt = 1;
     }
 
     /**
@@ -291,28 +289,26 @@ public class GridSpinReadWriteLock {
      * @return {@code True} if write lock has been acquired.
      */
     public boolean tryWriteLock() {
-//        long threadId = Thread.currentThread().getId();
-//
-//        if (threadId == writeLockOwner) {
-//            assert state == -1;
-//
-//            writeLockEntryCnt++;
-//
-//            return true;
-//        }
-//
-//        if (compareAndSet(STATE_OFFS, 0, -1)) {
-//            assert writeLockOwner == -1;
-//
-//            writeLockOwner = threadId;
-//            writeLockEntryCnt = 1;
-//
-//            return true;
-//        }
-//
-//        return false;
-
-        return true;
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return true;
+        }
+
+        if (compareAndSet(STATE_OFFS, 0, -1)) {
+            assert writeLockOwner == -1;
+
+            writeLockOwner = threadId;
+            writeLockEntryCnt = 1;
+
+            return true;
+        }
+
+        return false;
     }
 
     /**
@@ -323,83 +319,81 @@ public class GridSpinReadWriteLock {
      */
     @SuppressWarnings("BusyWait")
     public boolean tryWriteLock(long timeout, TimeUnit unit) throws InterruptedException {
-//        long threadId = Thread.currentThread().getId();
-//
-//        if (threadId == writeLockOwner) {
-//            assert state == -1;
-//
-//            writeLockEntryCnt++;
-//
-//            return true;
-//        }
-//
-//        try {
-//            // Increment pending write locks.
-//            while (true) {
-//                int pendingWLocks0 = pendingWLocks;
-//
-//                if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
-//                    break;
-//            }
-//
-//            long end = U.currentTimeMillis() + unit.toMillis(timeout);
-//
-//            while (true) {
-//                if (compareAndSet(STATE_OFFS, 0, -1)) {
-//                    assert writeLockOwner == -1;
-//
-//                    writeLockOwner = threadId;
-//                    writeLockEntryCnt = 1;
-//
-//                    return true;
-//                }
-//
-//                Thread.sleep(10);
-//
-//                if (end <= U.currentTimeMillis())
-//                    return false;
-//            }
-//        }
-//        finally {
-//            // Decrement pending write locks.
-//            while (true) {
-//                int pendingWLocks0 = pendingWLocks;
-//
-//                assert pendingWLocks0 > 0;
-//
-//                if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
-//                    break;
-//            }
-//        }
-
-        return true;
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return true;
+        }
+
+        try {
+            // Increment pending write locks.
+            while (true) {
+                int pendingWLocks0 = pendingWLocks;
+
+                if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+                    break;
+            }
+
+            long end = U.currentTimeMillis() + unit.toMillis(timeout);
+
+            while (true) {
+                if (compareAndSet(STATE_OFFS, 0, -1)) {
+                    assert writeLockOwner == -1;
+
+                    writeLockOwner = threadId;
+                    writeLockEntryCnt = 1;
+
+                    return true;
+                }
+
+                Thread.sleep(10);
+
+                if (end <= U.currentTimeMillis())
+                    return false;
+            }
+        }
+        finally {
+            // Decrement pending write locks.
+            while (true) {
+                int pendingWLocks0 = pendingWLocks;
+
+                assert pendingWLocks0 > 0;
+
+                if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+                    break;
+            }
+        }
     }
 
     /**
      * Releases write lock.
      */
     public void writeUnlock() {
-//        long threadId = Thread.currentThread().getId();
-//
-//        if (threadId != writeLockOwner)
-//            throw new IllegalMonitorStateException();
-//
-//        if (writeLockEntryCnt > 1) {
-//            writeLockEntryCnt--;
-//
-//            return;
-//        }
-//
-//        writeLockEntryCnt = 0;
-//        writeLockOwner = -1;
-//
-//        // Current thread holds write and read locks and is releasing
-//        // write lock now.
-//        int update = readLockEntryCnt.get() > 0 ? 1 : 0;
-//
-//        boolean b = compareAndSet(STATE_OFFS, -1, update);
-//
-//        assert b;
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId != writeLockOwner)
+            throw new IllegalMonitorStateException();
+
+        if (writeLockEntryCnt > 1) {
+            writeLockEntryCnt--;
+
+            return;
+        }
+
+        writeLockEntryCnt = 0;
+        writeLockOwner = -1;
+
+        // Current thread holds write and read locks and is releasing
+        // write lock now.
+        int update = readLockEntryCnt.get() > 0 ? 1 : 0;
+
+        boolean b = compareAndSet(STATE_OFFS, -1, update);
+
+        assert b;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index fb17cd7..f1aa4a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -424,7 +424,7 @@ public class GridNioServer<T> {
         assert ses != null;
         assert fut != null;
 
-        boolean wakeup = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
+        int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
 
         IgniteInClosure<IgniteException> ackClosure;
 
@@ -432,17 +432,17 @@ public class GridNioServer<T> {
             fut.ackClosure(ackClosure);
 
         if (ses.closed()) {
-            fut.connectionClosed();
+            if (ses.removeFuture(fut))
+                fut.connectionClosed();
         }
-        else if (wakeup)
+        else if (msgCnt == 1)
             // Change from 0 to 1 means that worker thread should be waken up.
             clientWorkers.get(ses.selectorIndex()).offer(fut);
 
         IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr;
 
         if (lsnr0 != null)
-            // TODO ignite-perftest pass correct queue size.
-            lsnr0.apply(ses, 0);
+            lsnr0.apply(ses, msgCnt);
     }
 
     /**
@@ -1383,7 +1383,7 @@ public class GridNioServer<T> {
 
                     long now = U.currentTimeMillis();
 
-                    if (U.currentTimeMillis() - lastIdleCheck > 5000) {
+                    if (now - lastIdleCheck > 5000) {
                         lastIdleCheck = now;
 
                         checkIdle(selector.keys());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a2b7565..6b1f6a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -21,14 +21,14 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 /**
  * Session implementation bound to selector API and socket API.
@@ -37,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
  */
 class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Pending write requests. */
-    private final ConcurrentLinkedQueue<GridNioFuture<?>> queue = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
 
     /** Selection key associated with this session. */
     @GridToStringExclude
@@ -47,7 +47,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     private final int selectorIdx;
 
     /** Size counter. */
-    private final AtomicBoolean wakeupSelector = new AtomicBoolean();
+    private final AtomicInteger queueSize = new AtomicInteger();
 
     /** Semaphore. */
     @GridToStringExclude
@@ -163,14 +163,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request.
      * @return Updated size of the queue.
      */
-    boolean offerSystemFuture(GridNioFuture<?> writeFut) {
+    int offerSystemFuture(GridNioFuture<?> writeFut) {
         writeFut.messageThread(true);
 
-        boolean res = queue.offer(writeFut);
+        boolean res = queue.offerFirst(writeFut);
 
         assert res : "Future was not added to queue";
 
-        return !wakeupSelector.get() && wakeupSelector.compareAndSet(false, true);
+        return queueSize.incrementAndGet();
     }
 
     /**
@@ -183,7 +183,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request to add.
      * @return Updated size of the queue.
      */
-    boolean offerFuture(GridNioFuture<?> writeFut) {
+    int offerFuture(GridNioFuture<?> writeFut) {
         boolean msgThread = GridNioBackPressureControl.threadProcessingMessage();
 
         if (sem != null && !msgThread)
@@ -195,7 +195,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert res : "Future was not added to queue";
 
-        return !wakeupSelector.get() && wakeupSelector.compareAndSet(false, true);
+        return queueSize.incrementAndGet();
     }
 
     /**
@@ -208,7 +208,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert add;
 
-        wakeupSelector.set(false);
+        boolean set = queueSize.compareAndSet(0, futs.size());
+
+        assert set;
     }
 
     /**
@@ -217,13 +219,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     @Nullable GridNioFuture<?> pollFuture() {
         GridNioFuture<?> last = queue.poll();
 
-        if (last == null) {
-            wakeupSelector.set(false);
-
-            last = queue.poll();
-        }
-
         if (last != null) {
+            queueSize.decrementAndGet();
+
             if (sem != null && !last.messageThread())
                 sem.release();
 
@@ -248,12 +246,22 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /**
+     * @param fut Future.
+     * @return {@code True} if future was removed from queue.
+     */
+    boolean removeFuture(GridNioFuture<?> fut) {
+        assert closed();
+
+        return queue.removeLastOccurrence(fut);
+    }
+
+    /**
      * Gets number of write requests in a queue that have not been processed yet.
      *
      * @return Number of write requests.
      */
     int writeQueueSize() {
-        return queue.size();
+        return queueSize.get();
     }
 
     /** {@inheritDoc} */