You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/21 12:31:21 UTC

[1/3] ignite git commit: WIP.

Repository: ignite
Updated Branches:
  refs/heads/ignite-put-experimental [created] 207a4c917


WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee363b9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee363b9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee363b9f

Branch: refs/heads/ignite-put-experimental
Commit: ee363b9fc0f5035d3f976a25e58426979578e6d8
Parents: 865e376
Author: thatcoach <pp...@list.ru>
Authored: Sat Mar 19 23:50:53 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Sat Mar 19 23:50:53 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 55 ++++++++--------
 .../deployment/GridDeploymentLocalStore.java    | 39 ++++++------
 .../processors/cache/GridCacheMvccManager.java  | 40 ++++++------
 .../processors/cache/GridCacheUtils.java        | 56 ++++++++--------
 .../dht/atomic/GridDhtAtomicCache.java          | 43 +++++++------
 .../ignite/internal/util/lang/GridFunc.java     | 52 +++++++--------
 .../ignite/internal/util/nio/GridNioServer.java | 59 ++++++++---------
 .../util/nio/GridSelectorNioSessionImpl.java    | 15 +++--
 .../communication/tcp/TcpCommunicationSpi.java  | 67 ++++++++++----------
 9 files changed, 216 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9ffbf4e..c4ca984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,27 +17,6 @@
 
 package org.apache.ignite.internal.managers.communication;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -81,7 +60,27 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -124,7 +123,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final Object sysLsnrsMux = new Object();
 
     /** Disconnect listeners. */
-    private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
+    private final Collection<GridDisconnectListener> disconnectLsnrs = new LinkedBlockingQueue<>();
 
     /** Map of {@link IoPool}-s injected by Ignite plugins. */
     private final IoPool[] ioPools = new IoPool[128];
@@ -164,7 +163,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final long discoDelay;
 
     /** Cache for messages that were received prior to discovery. */
-    private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap =
+    private final ConcurrentMap<UUID, LinkedBlockingDeque<DelayedMessage>> waitMap =
         new ConcurrentHashMap8<>();
 
     /** Communication message listener. */
@@ -418,7 +417,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                         lock.writeLock().lock();
 
                         try {
-                            ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId);
+                            LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(nodeId);
 
                             if (log.isDebugEnabled())
                                 log.debug("Removed messages from discovery startup delay list " +
@@ -448,9 +447,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         try {
             started = true;
 
-            for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet()) {
+            for (Entry<UUID, LinkedBlockingDeque<DelayedMessage>> e : waitMap.entrySet()) {
                 if (ctx.discovery().node(e.getKey()) != null) {
-                    ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey());
+                    LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(e.getKey());
 
                     if (log.isDebugEnabled())
                         log.debug("Processing messages from discovery startup delay list: " + waitList);
@@ -610,7 +609,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                             log.debug("Adding message to waiting list [senderId=" + nodeId +
                                 ", msg=" + msg + ']');
 
-                        ConcurrentLinkedDeque8<DelayedMessage> list =
+                        LinkedBlockingDeque<DelayedMessage> list =
                             F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque());
 
                         assert list != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
index ab45708..024ba00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
@@ -17,15 +17,6 @@
 
 package org.apache.ignite.internal.managers.deployment;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskName;
@@ -46,7 +37,17 @@ import org.apache.ignite.spi.deployment.DeploymentResource;
 import org.apache.ignite.spi.deployment.DeploymentSpi;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED;
 import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED;
@@ -60,7 +61,7 @@ import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
  */
 class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
     /** Deployment cache by class name. */
-    private final ConcurrentMap<String, ConcurrentLinkedDeque8<GridDeployment>> cache =
+    private final ConcurrentMap<String, LinkedBlockingDeque<GridDeployment>> cache =
         new ConcurrentHashMap8<>();
 
     /** Mutex. */
@@ -110,7 +111,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
         Collection<GridDeployment> deps = new ArrayList<>();
 
         synchronized (mux) {
-            for (ConcurrentLinkedDeque8<GridDeployment> depList : cache.values())
+            for (LinkedBlockingDeque<GridDeployment> depList : cache.values())
                 for (GridDeployment d : depList)
                     if (!deps.contains(d))
                         deps.add(d);
@@ -122,7 +123,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
     /** {@inheritDoc} */
     @Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
         synchronized (mux) {
-            for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values())
+            for (LinkedBlockingDeque<GridDeployment> deps : cache.values())
                 for (GridDeployment dep : deps)
                     if (dep.classLoaderId().equals(ldrId))
                         return dep;
@@ -232,7 +233,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
      * @return Deployment.
      */
     @Nullable private GridDeployment deployment(String alias) {
-        ConcurrentLinkedDeque8<GridDeployment> deps = cache.get(alias);
+        LinkedBlockingDeque<GridDeployment> deps = cache.get(alias);
 
         if (deps != null) {
             GridDeployment dep = deps.peekFirst();
@@ -260,10 +261,10 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
             boolean fireEvt = false;
 
             try {
-                ConcurrentLinkedDeque8<GridDeployment> cachedDeps = null;
+                LinkedBlockingDeque<GridDeployment> cachedDeps = null;
 
                 // Find existing class loader info.
-                for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) {
+                for (LinkedBlockingDeque<GridDeployment> deps : cache.values()) {
                     for (GridDeployment d : deps) {
                         if (d.classLoader() == ldr) {
                             // Cache class and alias.
@@ -304,7 +305,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
                 assert fireEvt : "Class was not added to newly created deployment [cls=" + cls +
                     ", depMode=" + depMode + ", dep=" + dep + ']';
 
-                ConcurrentLinkedDeque8<GridDeployment> deps =
+                LinkedBlockingDeque<GridDeployment> deps =
                     F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque());
 
                 if (!deps.isEmpty()) {
@@ -512,8 +513,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
         Collection<GridDeployment> doomed = new HashSet<>();
 
         synchronized (mux) {
-            for (Iterator<ConcurrentLinkedDeque8<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
-                ConcurrentLinkedDeque8<GridDeployment> deps = i1.next();
+            for (Iterator<LinkedBlockingDeque<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
+                LinkedBlockingDeque<GridDeployment> deps = i1.next();
 
                 for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) {
                     GridDeployment dep = i2.next();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index afba4bc..b3fcd73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,21 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -64,7 +51,20 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -116,7 +116,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
 
     /** Finish futures. */
-    private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+    private final LinkedBlockingDeque<FinishLockFuture> finishFuts = new LinkedBlockingDeque<>();
 
     /** Nested listener calls. */
     private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>() {
@@ -233,7 +233,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         else if (log.isDebugEnabled())
             log.debug("Failed to find transaction for changed owner: " + owner);
 
-        if (!finishFuts.isEmptyx()) {
+        if (!finishFuts.isEmpty()) {
             for (FinishLockFuture f : finishFuts)
                 f.recheck(entry);
         }
@@ -964,7 +964,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         X.println(">>>   lockedSize: " + locked.size());
         X.println(">>>   futsSize: " + (mvccFuts.size() + futs.size()));
         X.println(">>>   near2dhtSize: " + near2dht.size());
-        X.println(">>>   finishFutsSize: " + finishFuts.sizex());
+        X.println(">>>   finishFutsSize: " + finishFuts.size());
     }
 
     /**
@@ -984,7 +984,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
         Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
 
-        if (!finishFuts.isEmptyx()) {
+        if (!finishFuts.isEmpty()) {
             for (FinishLockFuture fut : finishFuts) {
                 if (fut.topologyVersion().equals(topVer))
                     cands.putAll(fut.pendingLocks());
@@ -1096,7 +1096,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         if (exchLog.isDebugEnabled())
             exchLog.debug("Rechecking pending locks for completion.");
 
-        if (!finishFuts.isEmptyx()) {
+        if (!finishFuts.isEmpty()) {
             for (FinishLockFuture fut : finishFuts)
                 fut.recheck();
         }
@@ -1133,7 +1133,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     Collection<GridCacheMvccCandidate> locs = entry.localCandidates();
 
                     if (!F.isEmpty(locs)) {
-                        Collection<GridCacheMvccCandidate> cands = new ConcurrentLinkedQueue<>();
+                        Collection<GridCacheMvccCandidate> cands = new LinkedBlockingQueue<>();
 
                         cands.addAll(F.view(locs, versionFilter()));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 1cdd303..04a8a07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -17,32 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.Cache;
-import javax.cache.CacheException;
-import javax.cache.configuration.Factory;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -55,7 +29,6 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
@@ -103,6 +76,33 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheWriterException;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -797,7 +797,7 @@ public class GridCacheUtils {
      */
     public static <T> IgniteReducer<T, Collection<T>> objectsReducer() {
         return new IgniteReducer<T, Collection<T>>() {
-            private final Collection<T> ret = new ConcurrentLinkedQueue<>();
+            private final Collection<T> ret = new LinkedBlockingQueue<>();
 
             @Override public boolean collect(T item) {
                 if (item != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e908c05..eec5271 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -17,24 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.io.Externalizable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -107,7 +89,26 @@ import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import java.io.Externalizable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
@@ -3175,7 +3176,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private AtomicBoolean guard = new AtomicBoolean(false);
 
         /** Response versions. */
-        private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
+        private LinkedBlockingDeque<GridCacheVersion> respVers = new LinkedBlockingDeque<>();
 
         /** Node ID. */
         private final UUID nodeId;
@@ -3242,7 +3243,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 respVers.add(ver);
 
-                if  (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
+                if  (respVers.size() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
                     snd = true;
             }
             finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 0678657..e10b4ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -17,29 +17,6 @@
 
 package org.apache.ignite.internal.util.lang;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.RandomAccess;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobResult;
@@ -77,6 +54,31 @@ import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.ThreadLocalRandom8;
 
+import javax.cache.Cache;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.RandomAccess;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * Contains factory and utility methods for {@code closures}, {@code predicates}, and {@code tuples}.
  * It also contains functional style collection comprehensions.
@@ -2332,8 +2334,8 @@ public class GridFunc {
      *      time its {@link org.apache.ignite.lang.IgniteOutClosure#apply()} method is called.
      */
     @SuppressWarnings("unchecked")
-    public static <T> IgniteCallable<ConcurrentLinkedDeque8<T>> newDeque() {
-        return (IgniteCallable<ConcurrentLinkedDeque8<T>>)DEQUE_FACTORY;
+    public static <T> IgniteCallable<LinkedBlockingDeque<T>> newDeque() {
+        return (IgniteCallable<LinkedBlockingDeque<T>>)DEQUE_FACTORY;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 42c7ac7..4cc06a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -17,6 +17,31 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+import sun.nio.ch.DirectBuffer;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
@@ -43,31 +68,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.configuration.ConnectorConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiInClosure;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-import sun.nio.ch.DirectBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER;
@@ -1113,7 +1114,7 @@ public class GridNioServer<T> {
          */
         private void writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChannel sockCh)
             throws IOException {
-            ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+            LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
 
             assert queue != null;
 
@@ -1248,7 +1249,7 @@ public class GridNioServer<T> {
      */
     private abstract class AbstractNioClientWorker extends GridWorker {
         /** Queue of change requests on this selector. */
-        private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>();
+        private final LinkedBlockingQueue<NioOperationFuture> changeReqs = new LinkedBlockingQueue<>();
 
         /** Selector to select read events. */
         private Selector selector;
@@ -2255,7 +2256,7 @@ public class GridNioServer<T> {
         /** {@inheritDoc} */
         @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
             if (directMode && sslFilter != null)
-                ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new ConcurrentLinkedQueue<>());
+                ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new LinkedBlockingQueue<>());
 
             proceedSessionOpened(ses);
         }
@@ -2276,7 +2277,7 @@ public class GridNioServer<T> {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 
                 if (sslSys) {
-                    ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+                    LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
 
                     assert queue != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 1241f99..0b68d5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -17,18 +17,19 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.util.Collection;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 /**
  * Session implementation bound to selector API and socket API.
@@ -37,7 +38,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
  */
 class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Pending write requests. */
-    private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
+    private final LinkedBlockingDeque<GridNioFuture<?>> queue = new LinkedBlockingDeque<>();
 
     /** Selection key associated with this session. */
     @GridToStringExclude

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b283b82..0611f46 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -17,37 +17,6 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -126,9 +95,41 @@ import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.LongAdder8;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 
@@ -803,7 +804,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private ShmemAcceptWorker shmemAcceptWorker;
 
     /** Shared memory workers. */
-    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
+    private final Collection<ShmemWorker> shmemWorkers = new LinkedBlockingDeque<>();
 
     /** Clients. */
     private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();


[2/3] ignite git commit: Merge branch 'master' into vozerov-perf-experimental

Posted by vo...@apache.org.
Merge branch 'master' into vozerov-perf-experimental


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8045382
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8045382
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8045382

Branch: refs/heads/ignite-put-experimental
Commit: e8045382e7bbdc4f9f148213e332c9baea3b7093
Parents: ee363b9 69f526a
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 21 14:22:35 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 21 14:22:35 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/igfs/IgfsPath.java   | 23 +++++++-
 .../ignite/internal/binary/BinaryContext.java   | 43 +++++++++++++++
 .../ignite/internal/binary/BinaryUtils.java     | 37 +++++++++++++
 .../processors/igfs/IgfsDirectoryInfo.java      | 27 +++++++++-
 .../internal/processors/igfs/IgfsEntryInfo.java | 35 ++++++++++++
 .../processors/igfs/IgfsFileAffinityRange.java  | 32 +++++++++--
 .../internal/processors/igfs/IgfsFileInfo.java  | 37 ++++++++++++-
 .../internal/processors/igfs/IgfsFileMap.java   | 41 ++++++++++++--
 .../processors/igfs/IgfsListingEntry.java       | 25 ++++++++-
 .../meta/IgfsMetaDirectoryCreateProcessor.java  | 55 ++++++++++++++++++-
 .../IgfsMetaDirectoryListingAddProcessor.java   | 41 +++++++++++++-
 ...IgfsMetaDirectoryListingRemoveProcessor.java | 49 ++++++++++++++++-
 ...gfsMetaDirectoryListingReplaceProcessor.java | 48 ++++++++++++++++-
 .../igfs/meta/IgfsMetaFileCreateProcessor.java  | 56 +++++++++++++++++++-
 .../igfs/meta/IgfsMetaFileLockProcessor.java    | 46 +++++++++++++++-
 .../meta/IgfsMetaFileRangeDeleteProcessor.java  | 39 +++++++++++++-
 .../meta/IgfsMetaFileRangeUpdateProcessor.java  | 41 +++++++++++++-
 .../meta/IgfsMetaFileReserveSpaceProcessor.java | 49 ++++++++++++++++-
 .../igfs/meta/IgfsMetaFileUnlockProcessor.java  | 45 +++++++++++++++-
 .../igfs/meta/IgfsMetaUpdatePathProcessor.java  | 40 +++++++++++++-
 .../meta/IgfsMetaUpdatePropertiesProcessor.java | 45 +++++++++++++++-
 .../igfs/meta/IgfsMetaUpdateTimesProcessor.java | 47 +++++++++++++++-
 22 files changed, 874 insertions(+), 27 deletions(-)
----------------------------------------------------------------------



[3/3] ignite git commit: Fixes.

Posted by vo...@apache.org.
Fixes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/207a4c91
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/207a4c91
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/207a4c91

Branch: refs/heads/ignite-put-experimental
Commit: 207a4c917c298ca6660ba2400348e59163d8f6d5
Parents: e804538
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 21 14:28:33 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 21 14:28:33 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  2 +-
 .../deployment/GridDeploymentLocalStore.java    | 41 ++++++++++----------
 .../ignite/internal/util/lang/GridFunc.java     | 26 ++++++++++++-
 3 files changed, 46 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/207a4c91/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c4ca984..5b7ecb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -2238,7 +2238,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         /** */
         @GridToStringInclude
-        private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new ConcurrentLinkedDeque<>();
+        private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new LinkedBlockingDeque<>();
 
         /** */
         private final AtomicBoolean reserved = new AtomicBoolean();

http://git-wip-us.apache.org/repos/asf/ignite/blob/207a4c91/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
index 024ba00..efe8173 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
@@ -17,6 +17,15 @@
 
 package org.apache.ignite.internal.managers.deployment;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskName;
@@ -37,17 +46,7 @@ import org.apache.ignite.spi.deployment.DeploymentResource;
 import org.apache.ignite.spi.deployment.DeploymentSpi;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingDeque;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED;
 import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED;
@@ -61,7 +60,7 @@ import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
  */
 class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
     /** Deployment cache by class name. */
-    private final ConcurrentMap<String, LinkedBlockingDeque<GridDeployment>> cache =
+    private final ConcurrentMap<String, ConcurrentLinkedDeque8<GridDeployment>> cache =
         new ConcurrentHashMap8<>();
 
     /** Mutex. */
@@ -111,7 +110,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
         Collection<GridDeployment> deps = new ArrayList<>();
 
         synchronized (mux) {
-            for (LinkedBlockingDeque<GridDeployment> depList : cache.values())
+            for (ConcurrentLinkedDeque8<GridDeployment> depList : cache.values())
                 for (GridDeployment d : depList)
                     if (!deps.contains(d))
                         deps.add(d);
@@ -123,7 +122,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
     /** {@inheritDoc} */
     @Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
         synchronized (mux) {
-            for (LinkedBlockingDeque<GridDeployment> deps : cache.values())
+            for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values())
                 for (GridDeployment dep : deps)
                     if (dep.classLoaderId().equals(ldrId))
                         return dep;
@@ -233,7 +232,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
      * @return Deployment.
      */
     @Nullable private GridDeployment deployment(String alias) {
-        LinkedBlockingDeque<GridDeployment> deps = cache.get(alias);
+        ConcurrentLinkedDeque8<GridDeployment> deps = cache.get(alias);
 
         if (deps != null) {
             GridDeployment dep = deps.peekFirst();
@@ -261,10 +260,10 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
             boolean fireEvt = false;
 
             try {
-                LinkedBlockingDeque<GridDeployment> cachedDeps = null;
+                ConcurrentLinkedDeque8<GridDeployment> cachedDeps = null;
 
                 // Find existing class loader info.
-                for (LinkedBlockingDeque<GridDeployment> deps : cache.values()) {
+                for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) {
                     for (GridDeployment d : deps) {
                         if (d.classLoader() == ldr) {
                             // Cache class and alias.
@@ -305,8 +304,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
                 assert fireEvt : "Class was not added to newly created deployment [cls=" + cls +
                     ", depMode=" + depMode + ", dep=" + dep + ']';
 
-                LinkedBlockingDeque<GridDeployment> deps =
-                    F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque());
+                ConcurrentLinkedDeque8<GridDeployment> deps =
+                    F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque2());
 
                 if (!deps.isEmpty()) {
                     for (GridDeployment d : deps) {
@@ -513,8 +512,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
         Collection<GridDeployment> doomed = new HashSet<>();
 
         synchronized (mux) {
-            for (Iterator<LinkedBlockingDeque<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
-                LinkedBlockingDeque<GridDeployment> deps = i1.next();
+            for (Iterator<ConcurrentLinkedDeque8<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
+                ConcurrentLinkedDeque8<GridDeployment> deps = i1.next();
 
                 for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) {
                     GridDeployment dep = i2.next();

http://git-wip-us.apache.org/repos/asf/ignite/blob/207a4c91/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index e10b4ed..2c02dc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -143,7 +143,18 @@ public class GridFunc {
     };
 
     /** */
-    private static final IgniteCallable<?> DEQUE_FACTORY = new IgniteCallable<ConcurrentLinkedDeque8>() {
+    private static final IgniteCallable<?> DEQUE_FACTORY = new IgniteCallable<LinkedBlockingDeque>() {
+        @Override public LinkedBlockingDeque call() {
+            return new LinkedBlockingDeque<>();
+        }
+
+        @Override public String toString() {
+            return "Deque factory.";
+        }
+    };
+
+    /** */
+    private static final IgniteCallable<?> DEQUE_FACTORY2 = new IgniteCallable<ConcurrentLinkedDeque8>() {
         @Override public ConcurrentLinkedDeque8 call() {
             return new ConcurrentLinkedDeque8();
         }
@@ -2339,6 +2350,19 @@ public class GridFunc {
     }
 
     /**
+     * Returns a factory closure that creates new {@link ConcurrentLinkedDeque8} instance.
+     * Note that this method does not create a new closure but returns a static one.
+     *
+     * @param <T> Type parameters for the created {@link List}.
+     * @return Factory closure that creates new {@link List} instance every
+     *      time its {@link org.apache.ignite.lang.IgniteOutClosure#apply()} method is called.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> IgniteCallable<ConcurrentLinkedDeque8<T>> newDeque2() {
+        return (IgniteCallable<ConcurrentLinkedDeque8<T>>)DEQUE_FACTORY2;
+    }
+
+    /**
      * Returns a factory closure that creates new {@link List} instance. Note that this
      * method does not create a new closure but returns a static one.
      *