You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/02/02 15:17:46 UTC
incubator-ignite git commit: #ignite-138: Implemented.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-138 [created] eb74c5ed6
#ignite-138: Implemented.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eb74c5ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eb74c5ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eb74c5ed
Branch: refs/heads/ignite-138
Commit: eb74c5ed625eb0dff30b0c4f31a9d3c6084bd937
Parents: ff7ee6d
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Feb 2 17:17:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Feb 2 17:17:08 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheContext.java | 15 +++-
.../cache/GridCacheEvictionManager.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 74 ++++----------------
.../GridCachePartitionExchangeManager.java | 5 +-
...ridCacheOptimisticCheckPreparedTxFuture.java | 4 +-
...dCachePessimisticCheckCommittedTxFuture.java | 2 +-
.../distributed/dht/GridDhtCacheAdapter.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 5 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 9 ++-
.../distributed/dht/GridDhtTxFinishFuture.java | 5 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 3 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 9 ++-
.../dht/GridPartitionedGetFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 6 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +-
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../colocated/GridDhtColocatedLockFuture.java | 5 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../preloader/GridDhtPartitionDemandPool.java | 2 +-
.../preloader/GridDhtPartitionSupplyPool.java | 20 ++----
.../GridDhtPartitionsExchangeFuture.java | 8 ++-
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../distributed/near/GridNearGetFuture.java | 2 +-
.../distributed/near/GridNearLockFuture.java | 5 +-
.../near/GridNearTransactionalCache.java | 4 +-
.../near/GridNearTxFinishFuture.java | 3 +-
.../near/GridNearTxPrepareFuture.java | 3 +-
.../query/GridCacheDistributedQueryFuture.java | 2 +-
.../query/GridCacheDistributedQueryManager.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 7 ++
.../cache/transactions/IgniteTxEx.java | 6 ++
.../cache/transactions/IgniteTxHandler.java | 6 +-
33 files changed, 99 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f624f8..d3fa2c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -62,10 +62,11 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CachePreloadMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
/**
* Cache context.
@@ -184,6 +185,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** System cache flag. */
private boolean sys;
+ /** IO policy. */
+ private GridIoPolicy plc;
+
/** Default expiry policy. */
private ExpiryPolicy expiryPlc;
@@ -301,6 +305,8 @@ public class GridCacheContext<K, V> implements Externalizable {
sys = CU.UTILITY_CACHE_NAME.equals(cacheName);
+ plc = sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;
+
Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory();
expiryPlc = factory != null ? factory.create() : null;
@@ -366,6 +372,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return IO policy for the given cache.
+ */
+ public GridIoPolicy ioPolicy() {
+ return plc;
+ }
+
+ /**
* @param cache Cache.
*/
public void cache(GridCacheAdapter<K, V> cache) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 837a6e4..ef63038 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -470,7 +470,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
*/
private void sendEvictionResponse(UUID nodeId, GridCacheEvictionResponse<K, V> res) {
try {
- cctx.io().send(nodeId, res);
+ cctx.io().send(nodeId, res, cctx.ioPolicy());
if (log.isDebugEnabled())
log.debug("Sent eviction response [node=" + nodeId + ", localNode=" + cctx.nodeId() +
@@ -1732,7 +1732,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
log.debug("Sending eviction request [node=" + nodeId + ", req=" + req + ']');
try {
- cctx.io().send(nodeId, req);
+ cctx.io().send(nodeId, req, cctx.ioPolicy());
}
catch (ClusterTopologyException ignored) {
// Node left the topology.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 3b7e2a0..359236d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -35,7 +35,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
* Cache communication manager.
@@ -183,6 +182,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param cacheMsg Cache message.
* @param c Handler closure.
*/
+ @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage<K, V> cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) {
rw.readLock();
@@ -336,18 +336,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @throws IgniteCheckedException If sending failed.
* @throws ClusterTopologyException If receiver left.
*/
- public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws IgniteCheckedException {
- send(node, msg, SYSTEM_POOL);
- }
-
- /**
- * Sends communication message.
- *
- * @param node Node to send the message to.
- * @param msg Message to send.
- * @throws IgniteCheckedException If sending failed.
- * @throws ClusterTopologyException If receiver left.
- */
+ @SuppressWarnings("unchecked")
public void send(ClusterNode node, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
assert !node.isLocal();
@@ -399,13 +388,14 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
*
* @param nodes Nodes to send to.
* @param msg Message to send.
+ * @param plc IO policy.
* @param fallback Callback for failed nodes.
* @return {@code True} if nodes are empty or message was sent, {@code false} if
* all nodes have left topology while sending this message.
* @throws IgniteCheckedException If send failed.
*/
- @SuppressWarnings( {"BusyWait"})
- public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage<K, V> msg,
+ @SuppressWarnings({"BusyWait", "unchecked"})
+ public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage<K, V> msg, GridIoPolicy plc,
@Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
assert nodes != null;
assert msg != null;
@@ -445,7 +435,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
else
msg0 = (GridCacheMessage<K, V>)msg.clone();
- cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL);
+ cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, plc);
boolean added = false;
@@ -527,29 +517,12 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param msg Message to send.
* @throws IgniteCheckedException If sending failed.
*/
- public void send(UUID nodeId, GridCacheMessage<K, V> msg) throws IgniteCheckedException {
- ClusterNode n = cctx.discovery().node(nodeId);
-
- if (n == null)
- throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" +
- msg + ']');
-
- send(n, msg);
- }
-
- /**
- * Sends communication message.
- *
- * @param nodeId ID of node to send the message to.
- * @param msg Message to send.
- * @throws IgniteCheckedException If sending failed.
- */
public void send(UUID nodeId, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
ClusterNode n = cctx.discovery().node(nodeId);
if (n == null)
- throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" +
- msg + ']');
+ throw new ClusterTopologyException("Failed to send message because node left grid [nodeId=" + nodeId +
+ ", msg=" + msg + ']');
send(n, msg, plc);
}
@@ -558,10 +531,11 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
+ * @param plc IO policy.
* @param timeout Timeout to keep a message on receiving queue.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage<K, V> msg,
+ public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage<K, V> msg, GridIoPolicy plc,
long timeout) throws IgniteCheckedException {
onSend(msg, node.id());
@@ -571,7 +545,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
try {
cnt++;
- cctx.gridIO().sendOrderedMessage(node, topic, msg, SYSTEM_POOL, timeout, false);
+ cctx.gridIO().sendOrderedMessage(node, topic, msg, plc, timeout, false);
if (log.isDebugEnabled())
log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg +
@@ -671,30 +645,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
}
/**
- * Removes message handler.
- *
- * @param type Type of message.
- * @param c Handler.
- */
- public void removeHandler(Class<?> type, IgniteBiInClosure<UUID, ?> c) {
- assert type != null;
- assert c != null;
-
- boolean res = clsHandlers.remove(type, c);
-
- if (log != null && log.isDebugEnabled()) {
- if (res) {
- log.debug("Removed cache communication handler " +
- "[type=" + type + ", handler=" + c + ']');
- }
- else {
- log.debug("Cache communication handler is not registered " +
- "[type=" + type + ", handler=" + c + ']');
- }
- }
- }
-
- /**
* Adds ordered message handler.
*
* @param topic Topic.
@@ -736,7 +686,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param cacheMsg Message.
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings("ErrorNotRethrown")
+ @SuppressWarnings({"ErrorNotRethrown", "unchecked"})
private void unmarshall(UUID nodeId, GridCacheMessage<K, V> cacheMsg) throws IgniteCheckedException {
if (cctx.localNodeId().equals(nodeId))
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 835aa39..fa7b5d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
/**
@@ -484,7 +485,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
- cctx.io().safeSend(nodes, m, null);
+ cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
return true;
}
@@ -508,7 +509,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
try {
- cctx.io().send(node, m);
+ cctx.io().send(node, m, SYSTEM_POOL);
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index dbfbb2c..567fd37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -157,7 +157,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
nodeTransactions(id), futureId(), fut.futureId());
try {
- cctx.io().send(id, req);
+ cctx.io().send(id, req, tx.ioPolicy());
}
catch (ClusterTopologyException ignored) {
fut.onNodeLeft();
@@ -178,7 +178,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
nodeTransactions(nodeId), futureId(), fut.futureId());
try {
- cctx.io().send(nodeId, req);
+ cctx.io().send(nodeId, req, tx.ioPolicy());
}
catch (ClusterTopologyException ignored) {
fut.onNodeLeft();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
index 2afa7a4..56cf33b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
@@ -148,7 +148,7 @@ public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends GridCompou
add(fut);
try {
- cctx.io().send(rmtNode.id(), req);
+ cctx.io().send(rmtNode.id(), req, tx.ioPolicy());
}
catch (ClusterTopologyException ignored) {
fut.onNodeLeft();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index c9681c8..de49e57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -642,7 +642,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
res.invalidPartitions(fut.invalidPartitions(), ctx.discovery().topologyVersion());
try {
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, ctx.ioPolicy());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
@@ -717,7 +717,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) {
try {
- ctx.io().send(req.getKey(), req.getValue());
+ ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy());
}
catch (IgniteCheckedException e) {
log.error("Failed to send TTL update request.", e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index f0da6b4..06bebd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -41,7 +41,6 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.dr.GridDrType.*;
/**
@@ -883,7 +882,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
if (log.isDebugEnabled())
log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
- cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(n, req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
@@ -946,7 +945,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
log.debug("Sending DHT lock request to near node [node=" + n.id() +
", req=" + req + ']');
- cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(n, req, cctx.ioPolicy());
}
catch (ClusterTopologyException e) {
fut.onResult(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 64c0811..f72f62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -41,7 +41,6 @@ import java.util.*;
import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
@@ -438,7 +437,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (res != null) {
try {
// Reply back to sender.
- ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ ctx.io().send(nodeId, res, ctx.ioPolicy());
}
catch (ClusterTopologyException ignored) {
U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId);
@@ -1107,7 +1106,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
// Don't send reply message to this node or if lock was cancelled.
if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class))
- ctx.io().send(nearNode, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ ctx.io().send(nearNode, res, ctx.ioPolicy());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send lock reply to originating node (will rollback transaction) [node=" +
@@ -1428,7 +1427,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.completedVersions(committed, rolledback);
- ctx.io().send(n, req);
+ ctx.io().send(n, req, ctx.ioPolicy());
}
catch (ClusterTopologyException ignore) {
if (log.isDebugEnabled())
@@ -1456,7 +1455,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.completedVersions(committed, rolledback);
- ctx.io().send(n, req);
+ ctx.io().send(n, req, ctx.ioPolicy());
}
catch (ClusterTopologyException ignore) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index b5c7927..eff678a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -37,7 +37,6 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
*
@@ -351,7 +350,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
req.writeVersion(tx.writeVersion());
try {
- cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(n, req, tx.ioPolicy());
if (sync)
res = true;
@@ -416,7 +415,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
req.writeVersion(tx.writeVersion());
try {
- cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
if (sync)
res = true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 4d62ecf..78e0c45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -37,7 +37,6 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
/**
@@ -633,7 +632,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
nearFinMiniId, err);
try {
- cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(nearNodeId, res, ioPolicy());
}
catch (ClusterTopologyException ignored) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 947bec4..d918449 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -40,7 +40,6 @@ import java.util.concurrent.atomic.*;
import static org.apache.ignite.transactions.IgniteTxState.*;
import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
*
@@ -287,7 +286,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t);
try {
- cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e);
@@ -400,7 +399,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
res.pending(localDhtPendingVersions(tx.writeEntries(), min));
- cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
}
return true;
@@ -690,7 +689,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
//noinspection TryWithIdenticalCatches
try {
- cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(n, req, tx.ioPolicy());
}
catch (ClusterTopologyException e) {
fut.onResult(e);
@@ -744,7 +743,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
//noinspection TryWithIdenticalCatches
try {
- cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
}
catch (ClusterTopologyException e) {
fut.onResult(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 94217f4..2d0ce60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -393,7 +393,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
add(fut); // Append new future.
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9b7c788..efd952b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2528,7 +2528,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC)
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, ctx.ioPolicy());
else {
// No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
sendDeferredUpdateResponse(nodeId, req.futureVersion());
@@ -2639,7 +2639,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse<K, V> res) {
try {
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, ctx.ioPolicy());
}
catch (ClusterTopologyException ignored) {
U.warn(log, "Failed to send near update reply to node because it left grid: " +
@@ -2922,7 +2922,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.gate().enter();
try {
- ctx.io().send(nodeId, msg);
+ ctx.io().send(nodeId, msg, ctx.ioPolicy());
}
finally {
ctx.gate().leave();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3b7e79d..336a9d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -352,7 +352,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
if (log.isDebugEnabled())
log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req);
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (ClusterTopologyException ignored) {
U.warn(log, "Failed to send update request to backup node because it left grid: " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 5cd8d54..e604c9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -790,7 +790,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req);
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
onDone(new GridCacheReturn<V>(null, true));
@@ -824,7 +824,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req);
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
addFailedKeys(req.keys(), e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7811713..504e2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -503,7 +503,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys()))
// We don't wait for reply to this message.
- ctx.io().send(n, req);
+ ctx.io().send(n, req, ctx.ioPolicy());
}
}
catch (IgniteCheckedException ex) {
@@ -585,7 +585,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
req.completedVersions(committed, rolledback);
// We don't wait for reply to this message.
- ctx.io().send(n, req);
+ ctx.io().send(n, req, ctx.ioPolicy());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index e0edcad..c76fb47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -44,7 +44,6 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
* Colocated cache lock future.
@@ -870,7 +869,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyException ex) {
assert fut != null;
@@ -885,7 +884,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyException ex) {
assert fut != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 16a56e2..cd14126 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -260,7 +260,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
log.debug("Sending force key request [cacheName=" + cctx.name() + "node=" + n.id() +
", req=" + req + ']');
- cctx.io().send(n, req);
+ cctx.io().send(n, req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index fde727e..f9cc203 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -613,7 +613,7 @@ public class GridDhtPartitionDemandPool<K, V> {
log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
// Send demand message.
- cctx.io().send(node, d);
+ cctx.io().send(node, d, cctx.ioPolicy());
// While.
// =====
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 865a16e..52a4ed5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -245,9 +245,6 @@ class GridDhtPartitionSupplyPool<K, V> {
boolean ack = false;
- // If demander node left grid.
- boolean nodeLeft = false;
-
boolean convertPortable = cctx.portableEnabled() && cctx.offheapTiered();
try {
@@ -300,11 +297,8 @@ class GridDhtPartitionSupplyPool<K, V> {
if (s.messageSize() >= cctx.config().getPreloadBatchSize()) {
ack = true;
- if (!reply(node, d, s)) {
- nodeLeft = true;
-
+ if (!reply(node, d, s))
return;
- }
// Throttle preloading.
if (preloadThrottle > 0)
@@ -355,11 +349,8 @@ class GridDhtPartitionSupplyPool<K, V> {
if (s.messageSize() >= cctx.config().getPreloadBatchSize()) {
ack = true;
- if (!reply(node, d, s)) {
- nodeLeft = true;
-
+ if (!reply(node, d, s))
return;
- }
// Throttle preloading.
if (preloadThrottle > 0)
@@ -453,11 +444,8 @@ class GridDhtPartitionSupplyPool<K, V> {
if (s.messageSize() >= cctx.config().getPreloadBatchSize()) {
ack = true;
- if (!reply(node, d, s)) {
- nodeLeft = true;
-
+ if (!reply(node, d, s))
return;
- }
s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(),
cctx.cacheId());
@@ -510,7 +498,7 @@ class GridDhtPartitionSupplyPool<K, V> {
if (log.isDebugEnabled())
log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
- cctx.io().sendOrderedMessage(n, d.topic(), s, d.timeout());
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 219737f..c49e439 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -41,6 +41,8 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
/**
* Future for exchanging partition maps.
*/
@@ -558,7 +560,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
- cctx.io().send(node, m);
+ cctx.io().send(node, m, SYSTEM_POOL);
}
/**
@@ -583,7 +585,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
", exchId=" + exchId + ", msg=" + m + ']');
- cctx.io().safeSend(nodes, m, null);
+ cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
}
/**
@@ -997,7 +999,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
if (!remaining.isEmpty()) {
try {
cctx.io().safeSend(cctx.discovery().nodes(remaining),
- new GridDhtPartitionsSingleRequest<K, V>(exchId), null);
+ new GridDhtPartitionsSingleRequest<K, V>(exchId), SYSTEM_POOL, null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to request partitions from nodes [exchangeId=" + exchId +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d5e9714..97e0ce8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -379,7 +379,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
if (log.isDebugEnabled())
log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']');
- cctx.io().send(node, res);
+ cctx.io().send(node, res, cctx.ioPolicy());
}
catch (ClusterTopologyException ignore) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index bd2f1d8..25b1ce6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -399,7 +399,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
add(fut); // Append new future.
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index d1b56e5..5e370aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -43,7 +43,6 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
* Cache lock future.
@@ -1120,7 +1119,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyException ex) {
assert fut != null;
@@ -1135,7 +1134,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyException ex) {
assert fut != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index d790ac3..268e6b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -577,7 +577,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
dht.removeLocks(ctx.nodeId(), req.version(), locKeys, true);
else if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys()))
// We don't wait for reply to this message.
- ctx.io().send(n, req);
+ ctx.io().send(n, req, ctx.ioPolicy());
}
}
catch (IgniteCheckedException ex) {
@@ -680,7 +680,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
req.completedVersions(committed, rolledback);
// We don't wait for reply to this message.
- ctx.io().send(n, req);
+ ctx.io().send(n, req, ctx.ioPolicy());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index ae0efd5..c714db2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -38,7 +38,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
/**
@@ -386,7 +385,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
cctx.tm().beforeFinishRemote(n.id(), tx.threadId());
try {
- cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(n, req, tx.ioPolicy());
// If we don't wait for result, then mark future as done.
if (!isSync() && !m.explicitLock())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 8df91e0..f658715 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -41,7 +41,6 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
/**
@@ -652,7 +651,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
add(fut); // Append new future.
try {
- cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(n, req, tx.ioPolicy());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index b80b2b6..83b1053 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -112,7 +112,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
});
if (!nodes.isEmpty()) {
- cctx.io().safeSend(nodes, req,
+ cctx.io().safeSend(nodes, req, cctx.ioPolicy(),
new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
onNodeLeft(node.id());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index cb7e9eb..431af1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -273,6 +273,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
node,
topic,
res,
+ cctx.ioPolicy(),
timeout > 0 ? timeout : Long.MAX_VALUE);
return true;
@@ -417,6 +418,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
/** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
@Override protected boolean onFieldsPageReady(boolean loc, GridCacheQueryInfo qryInfo,
@Nullable List<GridQueryFieldMetadata> metadata,
@Nullable Collection<?> entities,
@@ -687,7 +689,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
// For example, a remote reducer has a state, we should not serialize and then send
// the reducer changed by the local node.
if (!F.isEmpty(rmtNodes)) {
- cctx.io().safeSend(rmtNodes, req, new P1<ClusterNode>() {
+ cctx.io().safeSend(rmtNodes, req, cctx.ioPolicy(), new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
fut.onNodeLeft(node.id());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 10843ec..9400636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.*;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
import static org.apache.ignite.transactions.IgniteTxIsolation.*;
@@ -419,6 +421,11 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public GridIoPolicy ioPolicy() {
+ return sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean storeUsed() {
return storeEnabled() && store() != null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
index 63e4786..da40532 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.lang.*;
@@ -78,6 +79,11 @@ public interface IgniteTxEx<K, V> extends IgniteTx, GridTimeoutObject {
public boolean system();
/**
+ * @return Pool where message for the given transaction must be processed.
+ */
+ public GridIoPolicy ioPolicy();
+
+ /**
* @return Last recorded topology version.
*/
public long topologyVersion();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index fb94cd2..8c8da1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -484,7 +484,7 @@ public class IgniteTxHandler<K, V> {
req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
try {
- ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ ctx.io().send(nodeId, res, tx.ioPolicy());
}
catch (Throwable e) {
// Double-check.
@@ -1406,7 +1406,7 @@ public class IgniteTxHandler<K, V> {
if (log.isDebugEnabled())
log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, SYSTEM_POOL);
}
catch (ClusterTopologyException ignored) {
if (log.isDebugEnabled())
@@ -1506,7 +1506,7 @@ public class IgniteTxHandler<K, V> {
if (log.isDebugEnabled())
log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, SYSTEM_POOL);
}
catch (ClusterTopologyException ignored) {
if (log.isDebugEnabled())