You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/21 12:31:21 UTC
[1/3] ignite git commit: WIP.
Repository: ignite
Updated Branches:
refs/heads/ignite-put-experimental [created] 207a4c917
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee363b9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee363b9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee363b9f
Branch: refs/heads/ignite-put-experimental
Commit: ee363b9fc0f5035d3f976a25e58426979578e6d8
Parents: 865e376
Author: thatcoach <pp...@list.ru>
Authored: Sat Mar 19 23:50:53 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Sat Mar 19 23:50:53 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 55 ++++++++--------
.../deployment/GridDeploymentLocalStore.java | 39 ++++++------
.../processors/cache/GridCacheMvccManager.java | 40 ++++++------
.../processors/cache/GridCacheUtils.java | 56 ++++++++--------
.../dht/atomic/GridDhtAtomicCache.java | 43 +++++++------
.../ignite/internal/util/lang/GridFunc.java | 52 +++++++--------
.../ignite/internal/util/nio/GridNioServer.java | 59 ++++++++---------
.../util/nio/GridSelectorNioSessionImpl.java | 15 +++--
.../communication/tcp/TcpCommunicationSpi.java | 67 ++++++++++----------
9 files changed, 216 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9ffbf4e..c4ca984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,27 +17,6 @@
package org.apache.ignite.internal.managers.communication;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -81,7 +60,27 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -124,7 +123,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final Object sysLsnrsMux = new Object();
/** Disconnect listeners. */
- private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
+ private final Collection<GridDisconnectListener> disconnectLsnrs = new LinkedBlockingQueue<>();
/** Map of {@link IoPool}-s injected by Ignite plugins. */
private final IoPool[] ioPools = new IoPool[128];
@@ -164,7 +163,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final long discoDelay;
/** Cache for messages that were received prior to discovery. */
- private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap =
+ private final ConcurrentMap<UUID, LinkedBlockingDeque<DelayedMessage>> waitMap =
new ConcurrentHashMap8<>();
/** Communication message listener. */
@@ -418,7 +417,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
lock.writeLock().lock();
try {
- ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId);
+ LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(nodeId);
if (log.isDebugEnabled())
log.debug("Removed messages from discovery startup delay list " +
@@ -448,9 +447,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
try {
started = true;
- for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet()) {
+ for (Entry<UUID, LinkedBlockingDeque<DelayedMessage>> e : waitMap.entrySet()) {
if (ctx.discovery().node(e.getKey()) != null) {
- ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey());
+ LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(e.getKey());
if (log.isDebugEnabled())
log.debug("Processing messages from discovery startup delay list: " + waitList);
@@ -610,7 +609,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
log.debug("Adding message to waiting list [senderId=" + nodeId +
", msg=" + msg + ']');
- ConcurrentLinkedDeque8<DelayedMessage> list =
+ LinkedBlockingDeque<DelayedMessage> list =
F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque());
assert list != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
index ab45708..024ba00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
@@ -17,15 +17,6 @@
package org.apache.ignite.internal.managers.deployment;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskName;
@@ -46,7 +37,17 @@ import org.apache.ignite.spi.deployment.DeploymentResource;
import org.apache.ignite.spi.deployment.DeploymentSpi;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED;
import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED;
@@ -60,7 +61,7 @@ import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
*/
class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
/** Deployment cache by class name. */
- private final ConcurrentMap<String, ConcurrentLinkedDeque8<GridDeployment>> cache =
+ private final ConcurrentMap<String, LinkedBlockingDeque<GridDeployment>> cache =
new ConcurrentHashMap8<>();
/** Mutex. */
@@ -110,7 +111,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
Collection<GridDeployment> deps = new ArrayList<>();
synchronized (mux) {
- for (ConcurrentLinkedDeque8<GridDeployment> depList : cache.values())
+ for (LinkedBlockingDeque<GridDeployment> depList : cache.values())
for (GridDeployment d : depList)
if (!deps.contains(d))
deps.add(d);
@@ -122,7 +123,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
/** {@inheritDoc} */
@Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
synchronized (mux) {
- for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values())
+ for (LinkedBlockingDeque<GridDeployment> deps : cache.values())
for (GridDeployment dep : deps)
if (dep.classLoaderId().equals(ldrId))
return dep;
@@ -232,7 +233,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
* @return Deployment.
*/
@Nullable private GridDeployment deployment(String alias) {
- ConcurrentLinkedDeque8<GridDeployment> deps = cache.get(alias);
+ LinkedBlockingDeque<GridDeployment> deps = cache.get(alias);
if (deps != null) {
GridDeployment dep = deps.peekFirst();
@@ -260,10 +261,10 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
boolean fireEvt = false;
try {
- ConcurrentLinkedDeque8<GridDeployment> cachedDeps = null;
+ LinkedBlockingDeque<GridDeployment> cachedDeps = null;
// Find existing class loader info.
- for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) {
+ for (LinkedBlockingDeque<GridDeployment> deps : cache.values()) {
for (GridDeployment d : deps) {
if (d.classLoader() == ldr) {
// Cache class and alias.
@@ -304,7 +305,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
assert fireEvt : "Class was not added to newly created deployment [cls=" + cls +
", depMode=" + depMode + ", dep=" + dep + ']';
- ConcurrentLinkedDeque8<GridDeployment> deps =
+ LinkedBlockingDeque<GridDeployment> deps =
F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque());
if (!deps.isEmpty()) {
@@ -512,8 +513,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
Collection<GridDeployment> doomed = new HashSet<>();
synchronized (mux) {
- for (Iterator<ConcurrentLinkedDeque8<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
- ConcurrentLinkedDeque8<GridDeployment> deps = i1.next();
+ for (Iterator<LinkedBlockingDeque<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
+ LinkedBlockingDeque<GridDeployment> deps = i1.next();
for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) {
GridDeployment dep = i2.next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index afba4bc..b3fcd73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,21 +17,8 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -64,7 +51,20 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -116,7 +116,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
/** Finish futures. */
- private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+ private final LinkedBlockingDeque<FinishLockFuture> finishFuts = new LinkedBlockingDeque<>();
/** Nested listener calls. */
private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>() {
@@ -233,7 +233,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
else if (log.isDebugEnabled())
log.debug("Failed to find transaction for changed owner: " + owner);
- if (!finishFuts.isEmptyx()) {
+ if (!finishFuts.isEmpty()) {
for (FinishLockFuture f : finishFuts)
f.recheck(entry);
}
@@ -964,7 +964,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
X.println(">>> lockedSize: " + locked.size());
X.println(">>> futsSize: " + (mvccFuts.size() + futs.size()));
X.println(">>> near2dhtSize: " + near2dht.size());
- X.println(">>> finishFutsSize: " + finishFuts.sizex());
+ X.println(">>> finishFutsSize: " + finishFuts.size());
}
/**
@@ -984,7 +984,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
- if (!finishFuts.isEmptyx()) {
+ if (!finishFuts.isEmpty()) {
for (FinishLockFuture fut : finishFuts) {
if (fut.topologyVersion().equals(topVer))
cands.putAll(fut.pendingLocks());
@@ -1096,7 +1096,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (exchLog.isDebugEnabled())
exchLog.debug("Rechecking pending locks for completion.");
- if (!finishFuts.isEmptyx()) {
+ if (!finishFuts.isEmpty()) {
for (FinishLockFuture fut : finishFuts)
fut.recheck();
}
@@ -1133,7 +1133,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
Collection<GridCacheMvccCandidate> locs = entry.localCandidates();
if (!F.isEmpty(locs)) {
- Collection<GridCacheMvccCandidate> cands = new ConcurrentLinkedQueue<>();
+ Collection<GridCacheMvccCandidate> cands = new LinkedBlockingQueue<>();
cands.addAll(F.view(locs, versionFilter()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 1cdd303..04a8a07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -17,32 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.Cache;
-import javax.cache.CacheException;
-import javax.cache.configuration.Factory;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -55,7 +29,6 @@ import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
@@ -103,6 +76,33 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheWriterException;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -797,7 +797,7 @@ public class GridCacheUtils {
*/
public static <T> IgniteReducer<T, Collection<T>> objectsReducer() {
return new IgniteReducer<T, Collection<T>>() {
- private final Collection<T> ret = new ConcurrentLinkedQueue<>();
+ private final Collection<T> ret = new LinkedBlockingQueue<>();
@Override public boolean collect(T item) {
if (item != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e908c05..eec5271 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -17,24 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.io.Externalizable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -107,7 +89,26 @@ import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import java.io.Externalizable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
@@ -3175,7 +3176,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private AtomicBoolean guard = new AtomicBoolean(false);
/** Response versions. */
- private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
+ private LinkedBlockingDeque<GridCacheVersion> respVers = new LinkedBlockingDeque<>();
/** Node ID. */
private final UUID nodeId;
@@ -3242,7 +3243,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
respVers.add(ver);
- if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
+ if (respVers.size() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
snd = true;
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 0678657..e10b4ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -17,29 +17,6 @@
package org.apache.ignite.internal.util.lang;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.RandomAccess;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobResult;
@@ -77,6 +54,31 @@ import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ThreadLocalRandom8;
+import javax.cache.Cache;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.RandomAccess;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* Contains factory and utility methods for {@code closures}, {@code predicates}, and {@code tuples}.
* It also contains functional style collection comprehensions.
@@ -2332,8 +2334,8 @@ public class GridFunc {
* time its {@link org.apache.ignite.lang.IgniteOutClosure#apply()} method is called.
*/
@SuppressWarnings("unchecked")
- public static <T> IgniteCallable<ConcurrentLinkedDeque8<T>> newDeque() {
- return (IgniteCallable<ConcurrentLinkedDeque8<T>>)DEQUE_FACTORY;
+ public static <T> IgniteCallable<LinkedBlockingDeque<T>> newDeque() {
+ return (IgniteCallable<LinkedBlockingDeque<T>>)DEQUE_FACTORY;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 42c7ac7..4cc06a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -17,6 +17,31 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+import sun.nio.ch.DirectBuffer;
+
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
@@ -43,31 +68,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.configuration.ConnectorConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiInClosure;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-import sun.nio.ch.DirectBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER;
@@ -1113,7 +1114,7 @@ public class GridNioServer<T> {
*/
private void writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChannel sockCh)
throws IOException {
- ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+ LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
assert queue != null;
@@ -1248,7 +1249,7 @@ public class GridNioServer<T> {
*/
private abstract class AbstractNioClientWorker extends GridWorker {
/** Queue of change requests on this selector. */
- private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>();
+ private final LinkedBlockingQueue<NioOperationFuture> changeReqs = new LinkedBlockingQueue<>();
/** Selector to select read events. */
private Selector selector;
@@ -2255,7 +2256,7 @@ public class GridNioServer<T> {
/** {@inheritDoc} */
@Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
if (directMode && sslFilter != null)
- ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new ConcurrentLinkedQueue<>());
+ ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new LinkedBlockingQueue<>());
proceedSessionOpened(ses);
}
@@ -2276,7 +2277,7 @@ public class GridNioServer<T> {
boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
if (sslSys) {
- ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+ LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
assert queue != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 1241f99..0b68d5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -17,18 +17,19 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Collection;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
/**
* Session implementation bound to selector API and socket API.
@@ -37,7 +38,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
*/
class GridSelectorNioSessionImpl extends GridNioSessionImpl {
/** Pending write requests. */
- private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
+ private final LinkedBlockingDeque<GridNioFuture<?>> queue = new LinkedBlockingDeque<>();
/** Selection key associated with this session. */
@GridToStringExclude
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b283b82..0611f46 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -17,37 +17,6 @@
package org.apache.ignite.spi.communication.tcp;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -126,9 +95,41 @@ import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.LongAdder8;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -803,7 +804,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private ShmemAcceptWorker shmemAcceptWorker;
/** Shared memory workers. */
- private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
+ private final Collection<ShmemWorker> shmemWorkers = new LinkedBlockingDeque<>();
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
[2/3] ignite git commit: Merge branch 'master' into
vozerov-perf-experimental
Posted by vo...@apache.org.
Merge branch 'master' into vozerov-perf-experimental
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8045382
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8045382
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8045382
Branch: refs/heads/ignite-put-experimental
Commit: e8045382e7bbdc4f9f148213e332c9baea3b7093
Parents: ee363b9 69f526a
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 21 14:22:35 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 21 14:22:35 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/igfs/IgfsPath.java | 23 +++++++-
.../ignite/internal/binary/BinaryContext.java | 43 +++++++++++++++
.../ignite/internal/binary/BinaryUtils.java | 37 +++++++++++++
.../processors/igfs/IgfsDirectoryInfo.java | 27 +++++++++-
.../internal/processors/igfs/IgfsEntryInfo.java | 35 ++++++++++++
.../processors/igfs/IgfsFileAffinityRange.java | 32 +++++++++--
.../internal/processors/igfs/IgfsFileInfo.java | 37 ++++++++++++-
.../internal/processors/igfs/IgfsFileMap.java | 41 ++++++++++++--
.../processors/igfs/IgfsListingEntry.java | 25 ++++++++-
.../meta/IgfsMetaDirectoryCreateProcessor.java | 55 ++++++++++++++++++-
.../IgfsMetaDirectoryListingAddProcessor.java | 41 +++++++++++++-
...IgfsMetaDirectoryListingRemoveProcessor.java | 49 ++++++++++++++++-
...gfsMetaDirectoryListingReplaceProcessor.java | 48 ++++++++++++++++-
.../igfs/meta/IgfsMetaFileCreateProcessor.java | 56 +++++++++++++++++++-
.../igfs/meta/IgfsMetaFileLockProcessor.java | 46 +++++++++++++++-
.../meta/IgfsMetaFileRangeDeleteProcessor.java | 39 +++++++++++++-
.../meta/IgfsMetaFileRangeUpdateProcessor.java | 41 +++++++++++++-
.../meta/IgfsMetaFileReserveSpaceProcessor.java | 49 ++++++++++++++++-
.../igfs/meta/IgfsMetaFileUnlockProcessor.java | 45 +++++++++++++++-
.../igfs/meta/IgfsMetaUpdatePathProcessor.java | 40 +++++++++++++-
.../meta/IgfsMetaUpdatePropertiesProcessor.java | 45 +++++++++++++++-
.../igfs/meta/IgfsMetaUpdateTimesProcessor.java | 47 +++++++++++++++-
22 files changed, 874 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
[3/3] ignite git commit: Fixes.
Posted by vo...@apache.org.
Fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/207a4c91
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/207a4c91
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/207a4c91
Branch: refs/heads/ignite-put-experimental
Commit: 207a4c917c298ca6660ba2400348e59163d8f6d5
Parents: e804538
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 21 14:28:33 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 21 14:28:33 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 2 +-
.../deployment/GridDeploymentLocalStore.java | 41 ++++++++++----------
.../ignite/internal/util/lang/GridFunc.java | 26 ++++++++++++-
3 files changed, 46 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/207a4c91/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c4ca984..5b7ecb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -2238,7 +2238,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** */
@GridToStringInclude
- private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new ConcurrentLinkedDeque<>();
+ private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new LinkedBlockingDeque<>();
/** */
private final AtomicBoolean reserved = new AtomicBoolean();
http://git-wip-us.apache.org/repos/asf/ignite/blob/207a4c91/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
index 024ba00..efe8173 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
@@ -17,6 +17,15 @@
package org.apache.ignite.internal.managers.deployment;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskName;
@@ -37,17 +46,7 @@ import org.apache.ignite.spi.deployment.DeploymentResource;
import org.apache.ignite.spi.deployment.DeploymentSpi;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingDeque;
+import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED;
import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED;
@@ -61,7 +60,7 @@ import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
*/
class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
/** Deployment cache by class name. */
- private final ConcurrentMap<String, LinkedBlockingDeque<GridDeployment>> cache =
+ private final ConcurrentMap<String, ConcurrentLinkedDeque8<GridDeployment>> cache =
new ConcurrentHashMap8<>();
/** Mutex. */
@@ -111,7 +110,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
Collection<GridDeployment> deps = new ArrayList<>();
synchronized (mux) {
- for (LinkedBlockingDeque<GridDeployment> depList : cache.values())
+ for (ConcurrentLinkedDeque8<GridDeployment> depList : cache.values())
for (GridDeployment d : depList)
if (!deps.contains(d))
deps.add(d);
@@ -123,7 +122,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
/** {@inheritDoc} */
@Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
synchronized (mux) {
- for (LinkedBlockingDeque<GridDeployment> deps : cache.values())
+ for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values())
for (GridDeployment dep : deps)
if (dep.classLoaderId().equals(ldrId))
return dep;
@@ -233,7 +232,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
* @return Deployment.
*/
@Nullable private GridDeployment deployment(String alias) {
- LinkedBlockingDeque<GridDeployment> deps = cache.get(alias);
+ ConcurrentLinkedDeque8<GridDeployment> deps = cache.get(alias);
if (deps != null) {
GridDeployment dep = deps.peekFirst();
@@ -261,10 +260,10 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
boolean fireEvt = false;
try {
- LinkedBlockingDeque<GridDeployment> cachedDeps = null;
+ ConcurrentLinkedDeque8<GridDeployment> cachedDeps = null;
// Find existing class loader info.
- for (LinkedBlockingDeque<GridDeployment> deps : cache.values()) {
+ for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) {
for (GridDeployment d : deps) {
if (d.classLoader() == ldr) {
// Cache class and alias.
@@ -305,8 +304,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
assert fireEvt : "Class was not added to newly created deployment [cls=" + cls +
", depMode=" + depMode + ", dep=" + dep + ']';
- LinkedBlockingDeque<GridDeployment> deps =
- F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque());
+ ConcurrentLinkedDeque8<GridDeployment> deps =
+ F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque2());
if (!deps.isEmpty()) {
for (GridDeployment d : deps) {
@@ -513,8 +512,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
Collection<GridDeployment> doomed = new HashSet<>();
synchronized (mux) {
- for (Iterator<LinkedBlockingDeque<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
- LinkedBlockingDeque<GridDeployment> deps = i1.next();
+ for (Iterator<ConcurrentLinkedDeque8<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
+ ConcurrentLinkedDeque8<GridDeployment> deps = i1.next();
for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) {
GridDeployment dep = i2.next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/207a4c91/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index e10b4ed..2c02dc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -143,7 +143,18 @@ public class GridFunc {
};
/** */
- private static final IgniteCallable<?> DEQUE_FACTORY = new IgniteCallable<ConcurrentLinkedDeque8>() {
+ private static final IgniteCallable<?> DEQUE_FACTORY = new IgniteCallable<LinkedBlockingDeque>() {
+ @Override public LinkedBlockingDeque call() {
+ return new LinkedBlockingDeque<>();
+ }
+
+ @Override public String toString() {
+ return "Deque factory.";
+ }
+ };
+
+ /** */
+ private static final IgniteCallable<?> DEQUE_FACTORY2 = new IgniteCallable<ConcurrentLinkedDeque8>() {
@Override public ConcurrentLinkedDeque8 call() {
return new ConcurrentLinkedDeque8();
}
@@ -2339,6 +2350,19 @@ public class GridFunc {
}
/**
+ * Returns a factory closure that creates new {@link ConcurrentLinkedDeque8} instance.
+ * Note that this method does not create a new closure but returns a static one.
+ *
+ * @param <T> Type parameters for the created {@link List}.
+ * @return Factory closure that creates new {@link List} instance every
+ * time its {@link org.apache.ignite.lang.IgniteOutClosure#apply()} method is called.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> IgniteCallable<ConcurrentLinkedDeque8<T>> newDeque2() {
+ return (IgniteCallable<ConcurrentLinkedDeque8<T>>)DEQUE_FACTORY2;
+ }
+
+ /**
* Returns a factory closure that creates new {@link List} instance. Note that this
* method does not create a new closure but returns a static one.
*