You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/19 04:22:00 UTC
[3/8] incubator-ignite git commit: GG-9141 - Fixes for DR
transactions.
GG-9141 - Fixes for DR transactions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/88a2d8da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/88a2d8da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/88a2d8da
Branch: refs/heads/ignite-1
Commit: 88a2d8da1ca94a6e217c796cc958dfbfc9393eca
Parents: a70cfa2
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Dec 17 18:57:14 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Dec 17 18:57:14 2014 -0800
----------------------------------------------------------------------
.../gridgain/grid/kernal/GridKernalContext.java | 2 +-
.../grid/kernal/IgniteTransactionsEx.java | 33 +++++++++++++
.../processors/cache/GridCacheAdapter.java | 2 +
.../processors/cache/GridCacheContext.java | 26 ++++++----
.../processors/cache/GridCacheIoManager.java | 32 ++++++++-----
.../processors/cache/GridCacheTxAdapter.java | 14 ++++++
.../kernal/processors/cache/GridCacheTxEx.java | 9 ++++
.../processors/cache/GridCacheTxHandler.java | 25 ++++++----
.../cache/GridCacheTxLocalAdapter.java | 4 +-
.../processors/cache/GridCacheTxManager.java | 2 +
.../kernal/processors/cache/GridCacheUtils.java | 13 +++--
.../GridDistributedTxFinishRequest.java | 26 ++++++++++
.../GridDistributedTxPrepareRequest.java | 24 ++++++++++
.../GridDistributedTxRemoteAdapter.java | 3 ++
.../distributed/dht/GridDhtLockFuture.java | 6 +--
.../dht/GridDhtTransactionalCacheAdapter.java | 11 +++--
.../distributed/dht/GridDhtTxFinishFuture.java | 7 ++-
.../distributed/dht/GridDhtTxFinishRequest.java | 50 ++++++++++----------
.../cache/distributed/dht/GridDhtTxLocal.java | 9 ++--
.../distributed/dht/GridDhtTxLocalAdapter.java | 8 ++--
.../distributed/dht/GridDhtTxPrepareFuture.java | 9 ++--
.../dht/GridDhtTxPrepareRequest.java | 48 +++++++++----------
.../cache/distributed/dht/GridDhtTxRemote.java | 12 +++--
.../colocated/GridDhtColocatedLockFuture.java | 5 +-
.../distributed/near/GridNearLockFuture.java | 5 +-
.../near/GridNearTransactionalCache.java | 3 +-
.../near/GridNearTxFinishFuture.java | 5 +-
.../near/GridNearTxFinishRequest.java | 24 +++++-----
.../cache/distributed/near/GridNearTxLocal.java | 5 +-
.../near/GridNearTxPrepareFuture.java | 3 +-
.../near/GridNearTxPrepareRequest.java | 32 ++++++-------
.../distributed/near/GridNearTxRemote.java | 12 +++--
.../processors/cache/local/GridLocalTx.java | 4 +-
.../transactions/IgniteTransactionsImpl.java | 41 ++++++++++++----
.../cache/jta/GridCacheJtaManager.java | 15 +++---
35 files changed, 367 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
index 7b161a4..e1b9d92 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
@@ -483,7 +483,7 @@ public interface GridKernalContext extends GridMetadataAware, Iterable<GridCompo
/**
* @param name Plugin name.
* @return Plugin provider instance.
- * @throws org.apache.ignite.plugin.PluginNotFoundException If plugin provider for the given name was not found.
+ * @throws PluginNotFoundException If plugin provider for the given name was not found.
*/
public PluginProvider pluginProvider(String name) throws PluginNotFoundException;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java
new file mode 100644
index 0000000..8666e66
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java
@@ -0,0 +1,33 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.cache.*;
+
+/**
+ * Extended interface to work with system transactions.
+ */
+public interface IgniteTransactionsEx extends IgniteTransactions {
+ /**
+ * Starts transaction with specified isolation, concurrency, timeout, invalidation flag,
+ * and number of participating entries.
+ *
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout Timeout.
+ * @param txSize Number of entries participating in transaction (may be approximate).
+ * @return New transaction.
+ * @throws IllegalStateException If transaction is already started by this thread.
+ * @throws UnsupportedOperationException If cache is {@link GridCacheAtomicityMode#ATOMIC}.
+ */
+ public GridCacheTx txStartSystem(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout,
+ int txSize);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 720ff36..b5bd597 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -3603,6 +3603,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
tx = ctx.tm().newTx(
true,
op.single(),
+ ctx.system(),
PESSIMISTIC,
READ_COMMITTED,
tCfg.getDefaultTxTimeout(),
@@ -3677,6 +3678,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
tx = ctx.tm().newTx(
true,
op.single(),
+ ctx.system(),
PESSIMISTIC,
READ_COMMITTED,
ctx.kernalContext().config().getTransactionsConfiguration().getDefaultTxTimeout(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index 98766fb..a15713e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -169,6 +169,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Cache ID. */
private int cacheId;
+ /** System cache flag. */
+ private boolean sys;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -274,6 +277,8 @@ public class GridCacheContext<K, V> implements Externalizable {
}
else
cacheId = 1;
+
+ sys = CU.UTILITY_CACHE_NAME.equals(cacheName);
}
/**
@@ -309,6 +314,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return System cache flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
* @param cache Cache.
*/
public void cache(GridCacheAdapter<K, V> cache) {
@@ -928,8 +940,7 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * Same as {@link GridFunc#isAll(Object, org.apache.ignite.lang.IgnitePredicate[])}, but safely unwraps
- * exceptions.
+ * Same as {@link GridFunc#isAll(Object, IgnitePredicate[])}, but safely unwraps exceptions.
*
* @param e Element.
* @param p Predicates.
@@ -937,14 +948,13 @@ public class GridCacheContext<K, V> implements Externalizable {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"ErrorNotRethrown"})
- public <K, V> boolean isAll(GridCacheEntryEx<K, V> e,
- @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] p) throws IgniteCheckedException {
+ public <K1, V1> boolean isAll(GridCacheEntryEx<K1, V1> e,
+ @Nullable IgnitePredicate<GridCacheEntry<K1, V1>>[] p) throws IgniteCheckedException {
return F.isEmpty(p) || isAll(e.wrap(false), p);
}
/**
- * Same as {@link GridFunc#isAll(Object, org.apache.ignite.lang.IgnitePredicate[])}, but safely unwraps
- * exceptions.
+ * Same as {@link GridFunc#isAll(Object, IgnitePredicate[])}, but safely unwraps exceptions.
*
* @param e Element.
* @param p Predicates.
@@ -1569,7 +1579,7 @@ public class GridCacheContext<K, V> implements Externalizable {
/**
* @param obj Object.
* @return Portable object.
- * @throws org.apache.ignite.portables.PortableException In case of error.
+ * @throws PortableException In case of error.
*/
@Nullable public Object marshalToPortable(@Nullable Object obj) throws PortableException {
assert portableEnabled();
@@ -1634,7 +1644,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param col List to unwrap.
* @return Unwrapped list.
*/
- private ArrayList<Object> unwrapPortables(ArrayList<Object> col) {
+ private Collection<Object> unwrapPortables(ArrayList<Object> col) {
int size = col.size();
for (int i = 0; i < size; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
index a222c32..92cdb9a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
@@ -65,9 +65,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
/** Deployment enabled. */
private boolean depEnabled;
- /** IO policy. */
- private GridIoPolicy plc;
-
/** Message listener. */
private GridMessageListener lsnr = new GridMessageListener() {
@SuppressWarnings("unchecked")
@@ -132,10 +129,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
retryDelay = cctx.gridConfig().getNetworkSendRetryDelay();
retryCnt = cctx.gridConfig().getNetworkSendRetryCount();
- //String cacheName = cctx.name(); TODO GG-9141 how to determine policy?
-
- plc = SYSTEM_POOL; // TODO GG-9141 CU.isDrSystemCache(cacheName) ? DR_POOL : SYSTEM_POOL;
-
depEnabled = cctx.gridDeploy().enabled();
cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr);
@@ -333,7 +326,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param node Node to send the message to.
* @param msg Message to send.
* @throws IgniteCheckedException If sending failed.
- * @throws org.apache.ignite.cluster.ClusterTopologyException If receiver left.
+ * @throws ClusterTopologyException If receiver left.
*/
public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws IgniteCheckedException {
send(node, msg, SYSTEM_POOL);
@@ -345,7 +338,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param node Node to send the message to.
* @param msg Message to send.
* @throws IgniteCheckedException If sending failed.
- * @throws org.apache.ignite.cluster.ClusterTopologyException If receiver left.
+ * @throws ClusterTopologyException If receiver left.
*/
public void send(ClusterNode node, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
assert !node.isLocal();
@@ -444,7 +437,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
else
msg0 = (GridCacheMessage<K, V>)msg.clone();
- cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, plc);
+ cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL);
boolean added = false;
@@ -537,6 +530,23 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
}
/**
+ * Sends communication message.
+ *
+ * @param nodeId ID of node to send the message to.
+ * @param msg Message to send.
+ * @throws IgniteCheckedException If sending failed.
+ */
+ public void send(UUID nodeId, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
+ ClusterNode n = cctx.discovery().node(nodeId);
+
+ if (n == null)
+ throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" +
+ msg + ']');
+
+ send(n, msg, plc);
+ }
+
+ /**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msgId Ordered message ID.
@@ -554,7 +564,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
try {
cnt++;
- cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, plc, timeout, false);
+ cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, SYSTEM_POOL, timeout, false);
if (log.isDebugEnabled())
log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
index 7a32afa..ebd862b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
@@ -117,6 +117,9 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
/** Internal flag. */
protected boolean internal;
+ /** System transaction flag. */
+ private boolean sys;
+
/** */
protected boolean onePhaseCommit;
@@ -202,6 +205,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
* @param implicit Implicit flag.
* @param implicitSingle Implicit with one key flag.
* @param loc Local flag.
+ * @param sys System transaction flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -214,6 +218,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
boolean implicit,
boolean implicitSingle,
boolean loc,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -232,6 +237,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
this.implicit = implicit;
this.implicitSingle = implicitSingle;
this.loc = loc;
+ this.sys = sys;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
@@ -257,6 +263,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
* @param xidVer Transaction ID.
* @param startVer Start version mark.
* @param threadId Thread ID.
+ * @param sys System transaction flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -269,6 +276,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
GridCacheVersion xidVer,
GridCacheVersion startVer,
long threadId,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -282,6 +290,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
this.threadId = threadId;
this.xidVer = xidVer;
this.startVer = startVer;
+ this.sys = sys;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
@@ -394,6 +403,11 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public boolean system() {
+ return sys;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean storeUsed() {
return storeEnabled() && store() != null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
index 877c0f1..0dda62b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
@@ -54,6 +54,15 @@ public interface GridCacheTxEx<K, V> extends GridCacheTx, GridTimeoutObject {
public boolean storeUsed();
/**
+ * Checks if this is system cache transaction. System transactions are isolated from user transactions
+ * because some of the public API methods may be invoked inside user transactions and internally start
+ * system cache transactions.
+ *
+ * @return {@code True} if transaction is started for system cache.
+ */
+ public boolean system();
+
+ /**
* @return Last recorded topology version.
*/
public long topologyVersion();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
index fa85566..88def0e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
@@ -27,6 +27,7 @@ import java.util.*;
import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
@@ -240,6 +241,7 @@ public class GridCacheTxHandler<K, V> {
}
else {
tx = new GridDhtTxLocal<>(
+ ctx,
nearNode.id(),
req.version(),
req.futureId(),
@@ -247,7 +249,7 @@ public class GridCacheTxHandler<K, V> {
req.threadId(),
/*implicit*/false,
/*implicit-single*/false,
- ctx,
+ req.system(),
req.concurrency(),
req.isolation(),
req.timeout(),
@@ -468,7 +470,7 @@ public class GridCacheTxHandler<K, V> {
req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
try {
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (Throwable e) {
// Double-check.
@@ -491,6 +493,7 @@ public class GridCacheTxHandler<K, V> {
// Create transaction and add entries.
tx = ctx.tm().onCreated(
new GridDhtTxLocal<>(
+ ctx,
nodeId,
req.version(),
req.futureId(),
@@ -498,7 +501,7 @@ public class GridCacheTxHandler<K, V> {
req.threadId(),
true,
false, /* we don't know, so assume false. */
- ctx,
+ req.system(),
PESSIMISTIC,
READ_COMMITTED,
/*timeout */0,
@@ -666,7 +669,7 @@ public class GridCacheTxHandler<K, V> {
try {
// Reply back to sender.
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyException) {
@@ -864,7 +867,7 @@ public class GridCacheTxHandler<K, V> {
GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
try {
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (Throwable e) {
// Double-check.
@@ -895,6 +898,7 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridDhtTxRemote<>(
+ ctx,
req.nearNodeId(),
req.futureId(),
nodeId,
@@ -902,11 +906,11 @@ public class GridCacheTxHandler<K, V> {
req.topologyVersion(),
req.version(),
req.commitVersion(),
+ req.system(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
req.timeout(),
- ctx,
req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
req.groupLockKey(),
req.nearXidVersion(),
@@ -1012,18 +1016,19 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridNearTxRemote<>(
+ ctx,
ldr,
nodeId,
req.nearNodeId(),
req.threadId(),
req.version(),
req.commitVersion(),
+ req.system(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
req.timeout(),
req.nearWrites(),
- ctx,
req.txSize(),
req.groupLockKey(),
req.subjectId(),
@@ -1100,6 +1105,7 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridDhtTxRemote<>(
+ ctx,
req.nearNodeId(),
req.futureId(),
nodeId,
@@ -1109,11 +1115,11 @@ public class GridCacheTxHandler<K, V> {
req.topologyVersion(),
req.version(),
/*commitVer*/null,
+ req.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
0,
- ctx,
req.txSize(),
req.groupLockKey(),
req.subjectId(),
@@ -1241,6 +1247,7 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridNearTxRemote<>(
+ ctx,
nodeId,
req.nearNodeId(),
// We can pass null as nearXidVer as transaction will be committed right away.
@@ -1248,11 +1255,11 @@ public class GridCacheTxHandler<K, V> {
req.threadId(),
req.version(),
null,
+ req.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
0,
- ctx,
req.txSize(),
req.groupLockKey(),
req.subjectId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 59f0e97..384b243 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -94,6 +94,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
* @param implicit {@code True} if transaction was implicitly started by the system,
* {@code false} if it was started explicitly by user.
* @param implicitSingle {@code True} if transaction is implicit with only one key.
+ * @param sys System flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -106,6 +107,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -117,7 +119,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, /*local*/true, concurrency, isolation, timeout, invalidate,
+ super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
assert !partLock || grpLockKey != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
index cc37438..cad83e5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
@@ -361,6 +361,7 @@ public class GridCacheTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V
public GridCacheTxLocalAdapter<K, V> newTx(
boolean implicit,
boolean implicitSingle,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -377,6 +378,7 @@ public class GridCacheTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V
cctx,
implicit,
implicitSingle,
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
index d28f728..acd81ef 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
@@ -14,7 +14,6 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.fs.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
@@ -51,9 +50,6 @@ public class GridCacheUtils {
/** Security system cache name. */
public static final String UTILITY_CACHE_NAME = "gg-sys-cache";
- /** Flag to turn off DHT cache for debugging purposes. */
- public static final boolean DHT_ENABLED = true;
-
/** Default mask name. */
private static final String DEFAULT_MASK_NAME = "<default>";
@@ -1522,6 +1518,15 @@ public class GridCacheUtils {
}
/**
+ * @return Cache ID for utility cache.
+ */
+ public static int utilityCacheId() {
+ int hc = UTILITY_CACHE_NAME.hashCode();
+
+ return hc == 0 ? 1 : hc;
+ }
+
+ /**
* Validates that cache key or cache value implements {@link Externalizable}
*
* @param log Logger used to log warning message.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index d2093c2..15d2ce3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -82,6 +82,9 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
/** Group lock key bytes. */
private byte[] grpLockKeyBytes;
+ /** System flag. */
+ private boolean sys;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -96,6 +99,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
* @param commitVer Commit version.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
+ * @param sys System flag.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -112,6 +116,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
long threadId,
boolean commit,
boolean invalidate,
+ boolean sys,
boolean syncCommit,
boolean syncRollback,
GridCacheVersion baseVer,
@@ -130,6 +135,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
this.threadId = threadId;
this.commit = commit;
this.invalidate = invalidate;
+ this.sys = sys;
this.syncCommit = syncCommit;
this.syncRollback = syncRollback;
this.baseVer = baseVer;
@@ -164,6 +170,13 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
}
/**
+ * @return System flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
* @return Future ID.
*/
public IgniteUuid futureId() {
@@ -350,6 +363,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
_clone.txSize = txSize;
_clone.grpLockKey = grpLockKey;
_clone.grpLockKeyBytes = grpLockKeyBytes;
+ _clone.sys = sys;
}
/** {@inheritDoc} */
@@ -482,6 +496,11 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
commState.idx++;
+ case 20:
+ if (!commState.putBoolean(sys))
+ return false;
+
+ commState.idx++;
}
return true;
@@ -642,6 +661,13 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
commState.idx++;
+ case 20:
+ if (buf.remaining() < 1)
+ return false;
+
+ sys = commState.getBoolean();
+
+ commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 791c4f7..6c48627 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -103,6 +103,9 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
/** */
private byte[] txNodesBytes;
+ /** System flag. */
+ private boolean sys;
+
/**
* Required by {@link Externalizable}.
*/
@@ -135,6 +138,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
timeout = tx.timeout();
invalidate = tx.isInvalidate();
txSize = tx.size();
+ sys = tx.system();
this.reads = reads;
this.writes = writes;
@@ -151,6 +155,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
}
/**
+ * @return System flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
* Adds version to be verified on remote node.
*
* @param key Key for which version is verified.
@@ -415,6 +426,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
_clone.txSize = txSize;
_clone.txNodes = txNodes;
_clone.txNodesBytes = txNodesBytes;
+ _clone.sys = sys;
}
/** {@inheritDoc} */
@@ -553,6 +565,11 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
commState.idx++;
+ case 21:
+ if (!commState.putBoolean(sys))
+ return false;
+
+ commState.idx++;
}
return true;
@@ -725,6 +742,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
commState.idx++;
+ case 21:
+ if (buf.remaining() < 1)
+ return false;
+
+ sys = commState.getBoolean();
+
+ commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3cd3e2d..f38d483 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -74,6 +74,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
* @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -87,6 +88,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
@@ -102,6 +104,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
xidVer,
ctx.versions().last(),
Thread.currentThread().getId(),
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
index be4153a..0187e5f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
@@ -30,6 +29,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
/**
@@ -862,7 +862,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
if (log.isDebugEnabled())
log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
- cctx.io().send(n, req);
+ cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
@@ -924,7 +924,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
log.debug("Sending DHT lock request to near node [node=" + n.id() +
", req=" + req + ']');
- cctx.io().send(n, req);
+ cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException e) {
fut.onResult(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 3f30801..2113a68 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -29,6 +29,7 @@ import java.util.*;
import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
@@ -179,6 +180,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx == null) {
tx = new GridDhtTxRemote<>(
+ ctx.shared(),
req.nodeId(),
req.futureId(),
nodeId,
@@ -187,11 +189,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.topologyVersion(),
req.version(),
/*commitVer*/null,
+ ctx.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
req.timeout(),
- ctx.shared(),
req.txSize(),
req.groupLockKey(),
req.subjectId(),
@@ -423,7 +425,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (res != null) {
try {
// Reply back to sender.
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ignored) {
U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId);
@@ -748,6 +750,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (req.inTx()) {
if (tx == null) {
tx = new GridDhtTxLocal<>(
+ ctx.shared(),
nearNode.id(),
req.version(),
req.futureId(),
@@ -755,7 +758,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.threadId(),
req.implicitTx(),
req.implicitSingleTx(),
- ctx.shared(),
+ ctx.system(),
PESSIMISTIC,
req.isolation(),
req.timeout(),
@@ -1059,7 +1062,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
// Don't send reply message to this node or if lock was cancelled.
if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class))
- ctx.io().send(nearNode, res);
+ ctx.io().send(nearNode, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send lock reply to originating node (will rollback transaction) [node=" +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index a836ff8..d65de8c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
*
@@ -304,6 +305,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.isolation(),
commit,
tx.isInvalidate(),
+ tx.system(),
tx.isSystemInvalidate(),
tx.syncCommit(),
tx.syncRollback(),
@@ -324,7 +326,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
req.writeVersion(tx.writeVersion());
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
if (sync)
res = true;
@@ -361,6 +363,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.isolation(),
commit,
tx.isInvalidate(),
+ tx.system(),
tx.isSystemInvalidate(),
tx.syncCommit(),
tx.syncRollback(),
@@ -381,7 +384,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
req.writeVersion(tx.writeVersion());
try {
- cctx.io().send(nearMapping.node(), req);
+ cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
if (sync)
res = true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c5db862..21cccf0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -92,6 +92,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
* @param isolation Transaction isolation.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
+ * @param sys System flag.
* @param sysInvalidate System invalidation flag.
* @param baseVer Base version.
* @param committedVers Committed versions.
@@ -115,6 +116,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
GridCacheTxIsolation isolation,
boolean commit,
boolean invalidate,
+ boolean sys,
boolean sysInvalidate,
boolean syncCommit,
boolean syncRollback,
@@ -131,8 +133,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
@Nullable UUID subjId,
int taskNameHash
) {
- super(xidVer, futId, commitVer, threadId, commit, invalidate, syncCommit, syncRollback, baseVer, committedVers,
- rolledbackVers, txSize, writes, recoverWrites, grpLockKey);
+ super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer,
+ committedVers, rolledbackVers, txSize, writes, recoverWrites, grpLockKey);
assert miniId != null;
assert nearNodeId != null;
@@ -322,25 +324,25 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
}
switch (commState.idx) {
- case 20:
+ case 21:
if (!commState.putEnum(isolation))
return false;
commState.idx++;
- case 21:
+ case 22:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putUuid(nearNodeId))
return false;
commState.idx++;
- case 23:
+ case 24:
if (nearWritesBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearWritesBytes.size()))
@@ -367,13 +369,13 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 24:
+ case 25:
if (!commState.putBoolean(onePhaseCommit))
return false;
commState.idx++;
- case 25:
+ case 26:
if (pendingVers != null) {
if (commState.it == null) {
if (!commState.putInt(pendingVers.size()))
@@ -400,31 +402,31 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 26:
+ case 27:
if (!commState.putBoolean(sysInvalidate))
return false;
commState.idx++;
- case 27:
+ case 28:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 28:
+ case 29:
if (!commState.putCacheVersion(writeVer))
return false;
commState.idx++;
- case 29:
+ case 30:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 30:
+ case 31:
if (!commState.putInt(taskNameHash))
return false;
@@ -444,7 +446,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
return false;
switch (commState.idx) {
- case 20:
+ case 21:
if (buf.remaining() < 1)
return false;
@@ -454,7 +456,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 21:
+ case 22:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -464,7 +466,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 22:
+ case 23:
UUID nearNodeId0 = commState.getUuid();
if (nearNodeId0 == UUID_NOT_READ)
@@ -474,7 +476,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 23:
+ case 24:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -503,7 +505,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 24:
+ case 25:
if (buf.remaining() < 1)
return false;
@@ -511,7 +513,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 25:
+ case 26:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -540,7 +542,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 26:
+ case 27:
if (buf.remaining() < 1)
return false;
@@ -548,7 +550,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 27:
+ case 28:
if (buf.remaining() < 8)
return false;
@@ -556,7 +558,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 28:
+ case 29:
GridCacheVersion writeVer0 = commState.getCacheVersion();
if (writeVer0 == CACHE_VER_NOT_READ)
@@ -566,7 +568,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 29:
+ case 30:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -576,7 +578,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 30:
+ case 31:
if (buf.remaining() < 4)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
index b53c1c8..9fa40d3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
/**
@@ -87,6 +88,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
* @param txNodes Transaction nodes mapping.
*/
public GridDhtTxLocal(
+ GridCacheSharedContext<K, V> cctx,
UUID nearNodeId,
GridCacheVersion nearXidVer,
IgniteUuid nearFutId,
@@ -94,7 +96,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
long nearThreadId,
boolean implicit,
boolean implicitSingle,
- GridCacheSharedContext<K, V> cctx,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -108,10 +110,11 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
int taskNameHash
) {
super(
+ cctx,
cctx.versions().onReceivedAndNext(nearNodeId, nearXidVer),
implicit,
implicitSingle,
- cctx,
+ sys,
concurrency,
isolation,
timeout,
@@ -625,7 +628,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
nearFinMiniId, err);
try {
- cctx.io().send(nearNodeId, res);
+ cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ignored) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 461ea04..035f9a2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -69,6 +69,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
* @param implicit Implicit flag.
* @param implicitSingle Implicit-with-single-key flag.
* @param cctx Cache context.
+ * @param sys System flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -77,10 +78,11 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
* @param partLock If this is a group-lock transaction and the whole partition should be locked.
*/
protected GridDhtTxLocalAdapter(
+ GridCacheSharedContext<K, V> cctx,
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
- GridCacheSharedContext<K, V> cctx,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -92,8 +94,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, concurrency, isolation, timeout, invalidate, storeEnabled, txSize,
- grpLockKey, partLock, subjId, taskNameHash);
+ super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, isolation, timeout, invalidate, storeEnabled,
+ txSize, grpLockKey, partLock, subjId, taskNameHash);
assert cctx != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index f872cf9..84c8f0f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
*
@@ -276,7 +277,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t);
try {
- cctx.io().send(tx.nearNodeId(), res);
+ cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e);
@@ -386,7 +387,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
res.pending(localDhtPendingVersions(tx.writeEntries(), min));
- cctx.io().send(tx.nearNodeId(), res);
+ cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
return true;
@@ -676,7 +677,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
//noinspection TryWithIdenticalCatches
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException e) {
fut.onResult(e);
@@ -730,7 +731,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
//noinspection TryWithIdenticalCatches
try {
- cctx.io().send(nearMapping.node(), req);
+ cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException e) {
fut.onResult(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 61dfe70..e6f9051 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -352,37 +352,37 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
}
switch (commState.idx) {
- case 21:
+ case 22:
if (!commState.putGridUuid(futId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putBitSet(invalidateNearEntries))
return false;
commState.idx++;
- case 23:
+ case 24:
if (!commState.putBoolean(last))
return false;
commState.idx++;
- case 24:
+ case 25:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 25:
+ case 26:
if (!commState.putUuid(nearNodeId))
return false;
commState.idx++;
- case 26:
+ case 27:
if (nearWritesBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearWritesBytes.size()))
@@ -409,37 +409,37 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 27:
+ case 28:
if (!commState.putCacheVersion(nearXidVer))
return false;
commState.idx++;
- case 28:
+ case 29:
if (!commState.putByteArray(ownedBytes))
return false;
commState.idx++;
- case 29:
+ case 30:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 30:
+ case 31:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 31:
+ case 32:
if (!commState.putInt(taskNameHash))
return false;
commState.idx++;
- case 32:
+ case 33:
if (!commState.putBitSet(preloadKeys))
return false;
@@ -459,7 +459,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
return false;
switch (commState.idx) {
- case 21:
+ case 22:
IgniteUuid futId0 = commState.getGridUuid();
if (futId0 == GRID_UUID_NOT_READ)
@@ -469,7 +469,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 22:
+ case 23:
BitSet invalidateNearEntries0 = commState.getBitSet();
if (invalidateNearEntries0 == BIT_SET_NOT_READ)
@@ -479,7 +479,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 23:
+ case 24:
if (buf.remaining() < 1)
return false;
@@ -487,7 +487,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 24:
+ case 25:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -497,7 +497,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 25:
+ case 26:
UUID nearNodeId0 = commState.getUuid();
if (nearNodeId0 == UUID_NOT_READ)
@@ -507,7 +507,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 26:
+ case 27:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -536,7 +536,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 27:
+ case 28:
GridCacheVersion nearXidVer0 = commState.getCacheVersion();
if (nearXidVer0 == CACHE_VER_NOT_READ)
@@ -546,7 +546,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 28:
+ case 29:
byte[] ownedBytes0 = commState.getByteArray();
if (ownedBytes0 == BYTE_ARR_NOT_READ)
@@ -556,7 +556,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 29:
+ case 30:
if (buf.remaining() < 8)
return false;
@@ -564,7 +564,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 30:
+ case 31:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -574,7 +574,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 31:
+ case 32:
if (buf.remaining() < 4)
return false;
@@ -582,7 +582,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 32:
+ case 33:
BitSet preloadKeys0 = commState.getBitSet();
if (preloadKeys0 == BIT_SET_NOT_READ)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 3bc41b2..e71b0fb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -59,6 +59,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param topVer Topology version.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -70,6 +71,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param txNodes Transaction nodes mapping.
*/
public GridDhtTxRemote(
+ GridCacheSharedContext<K, V> ctx,
UUID nearNodeId,
IgniteUuid rmtFutId,
UUID nodeId,
@@ -77,11 +79,11 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
long topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
GridCacheVersion nearXidVer,
@@ -89,7 +91,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
@@ -118,6 +120,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param topVer Topology version.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -127,6 +130,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param grpLockKey Group lock key if transaction is group-lock.
*/
public GridDhtTxRemote(
+ GridCacheSharedContext<K, V> ctx,
UUID nearNodeId,
IgniteUuid rmtFutId,
UUID nodeId,
@@ -135,17 +139,17 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
long topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 535f46e..baa7ead 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -32,6 +32,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
* Colocated cache lock future.
@@ -832,7 +833,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
@@ -847,7 +848,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
index b67229d..f0ce36b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -31,6 +31,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
* Cache lock future.
@@ -1099,7 +1100,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
@@ -1114,7 +1115,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 9b4d117..7a71452 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -247,17 +247,18 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (tx == null) {
tx = new GridNearTxRemote<>(
+ ctx.shared(),
nodeId,
req.nearNodeId(),
req.nearXidVersion(),
req.threadId(),
req.version(),
null,
+ ctx.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
req.timeout(),
- ctx.shared(),
req.txSize(),
req.groupLockKey(),
req.subjectId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 176fdd0..4a438bf 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
@@ -28,6 +27,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
/**
@@ -338,6 +338,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.threadId(),
commit,
tx.isInvalidate(),
+ tx.system(),
tx.syncCommit(),
tx.syncRollback(),
m.explicitLock(),
@@ -373,7 +374,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
cctx.tm().beforeFinishRemote(n.id(), tx.threadId());
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
// If we don't wait for result, then mark future as done.
if (!isSync() && !m.explicitLock())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 90cdfe6..76976e8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -58,6 +58,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
* @param threadId Thread ID.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
+ * @param sys System flag.
* @param explicitLock Explicit lock flag.
* @param topVer Topology version.
* @param baseVer Base version.
@@ -73,6 +74,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
long threadId,
boolean commit,
boolean invalidate,
+ boolean sys,
boolean syncCommit,
boolean syncRollback,
boolean explicitLock,
@@ -85,7 +87,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
Collection<GridCacheTxEntry<K, V>> recoverEntries,
@Nullable UUID subjId,
int taskNameHash) {
- super(xidVer, futId, null, threadId, commit, invalidate, syncCommit, syncRollback, baseVer, committedVers,
+ super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, committedVers,
rolledbackVers, txSize, writeEntries, recoverEntries, null);
this.explicitLock = explicitLock;
@@ -175,31 +177,31 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
}
switch (commState.idx) {
- case 20:
+ case 21:
if (!commState.putBoolean(explicitLock))
return false;
commState.idx++;
- case 21:
+ case 22:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 23:
+ case 24:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 24:
+ case 25:
if (!commState.putInt(taskNameHash))
return false;
@@ -219,7 +221,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
return false;
switch (commState.idx) {
- case 20:
+ case 21:
if (buf.remaining() < 1)
return false;
@@ -227,7 +229,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 21:
+ case 22:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -237,7 +239,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 22:
+ case 23:
if (buf.remaining() < 8)
return false;
@@ -245,7 +247,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 23:
+ case 24:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -255,7 +257,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 24:
+ case 25:
if (buf.remaining() < 4)
return false;