You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/19 07:16:09 UTC
[2/9] incubator-ignite git commit: GG-9141 - Fixes for DR
transactions.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
index 2b974e9..df4dd58 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -81,6 +81,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
* @param ctx Cache registry.
* @param implicit Implicit flag.
* @param implicitSingle Implicit with one key flag.
+ * @param sys System flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -92,6 +93,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
GridCacheSharedContext<K, V> ctx,
boolean implicit,
boolean implicitSingle,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -104,10 +106,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
int taskNameHash
) {
super(
+ ctx,
ctx.versions().next(),
implicit,
implicitSingle,
- ctx,
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 5bc5a3e..9098b73 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -30,6 +30,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
/**
@@ -640,7 +641,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
add(fut); // Append new future.
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 30b7aef..431e134 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -242,19 +242,19 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
}
switch (commState.idx) {
- case 21:
+ case 22:
if (!commState.putGridUuid(futId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putBoolean(last))
return false;
commState.idx++;
- case 23:
+ case 24:
if (lastBackups != null) {
if (commState.it == null) {
if (!commState.putInt(lastBackups.size()))
@@ -281,31 +281,31 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 24:
+ case 25:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 25:
+ case 26:
if (!commState.putBoolean(near))
return false;
commState.idx++;
- case 26:
+ case 27:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 27:
+ case 28:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 28:
+ case 29:
if (!commState.putInt(taskNameHash))
return false;
@@ -325,7 +325,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
return false;
switch (commState.idx) {
- case 21:
+ case 22:
IgniteUuid futId0 = commState.getGridUuid();
if (futId0 == GRID_UUID_NOT_READ)
@@ -335,7 +335,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 22:
+ case 23:
if (buf.remaining() < 1)
return false;
@@ -343,7 +343,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 23:
+ case 24:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -372,7 +372,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 24:
+ case 25:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -382,7 +382,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 25:
+ case 26:
if (buf.remaining() < 1)
return false;
@@ -390,7 +390,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 26:
+ case 27:
if (buf.remaining() < 8)
return false;
@@ -398,7 +398,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 27:
+ case 28:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -408,7 +408,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 28:
+ case 29:
if (buf.remaining() < 4)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
index aa3546b..938e2d2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -62,6 +62,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -73,24 +74,25 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @throws IgniteCheckedException If unmarshalling failed.
*/
public GridNearTxRemote(
+ GridCacheSharedContext<K, V> ctx,
ClassLoader ldr,
UUID nodeId,
UUID nearNodeId,
long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
Collection<GridCacheTxEntry<K, V>> writeEntries,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) throws IgniteCheckedException {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
@@ -119,6 +121,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -128,23 +131,24 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param grpLockKey Collection of group lock keys if this is a group-lock transaction.
*/
public GridNearTxRemote(
+ GridCacheSharedContext<K, V> ctx,
UUID nodeId,
UUID nearNodeId,
GridCacheVersion nearXidVer,
long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
index cebd888..aac60fd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
@@ -60,8 +60,8 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> {
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, ctx.versions().next(), implicit, implicitSingle, concurrency, isolation, timeout, false, true, txSize,
- null, false, subjId, taskNameHash);
+ super(ctx, ctx.versions().next(), implicit, implicitSingle, false, concurrency, isolation, timeout, false, true,
+ txSize, null, false, subjId, taskNameHash);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
index 60eec9e..aece7cf 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.transactions;
import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -25,7 +26,7 @@ import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
/**
* Grid transactions implementation.
*/
-public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
+public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
/** Cache shared context. */
private GridCacheSharedContext<K, V> cctx;
@@ -44,7 +45,9 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
cfg.getDefaultTxConcurrency(),
cfg.getDefaultTxIsolation(),
cfg.getDefaultTxTimeout(),
- 0);
+ 0,
+ false
+ );
}
/** {@inheritDoc} */
@@ -58,7 +61,8 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
concurrency,
isolation,
cfg.getDefaultTxTimeout(),
- 0
+ 0,
+ false
);
}
@@ -74,7 +78,24 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
concurrency,
isolation,
timeout,
- txSize
+ txSize,
+ false
+ );
+ }
+
+ @Override public GridCacheTx txStartSystem(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation,
+ long timeout, int txSize) {
+ A.notNull(concurrency, "concurrency");
+ A.notNull(isolation, "isolation");
+ A.ensure(timeout >= 0, "timeout cannot be negative");
+ A.ensure(txSize >= 0, "transaction size cannot be negative");
+
+ return txStart0(
+ concurrency,
+ isolation,
+ timeout,
+ txSize,
+ true
);
}
@@ -83,10 +104,11 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
* @param isolation Transaction isolation.
* @param timeout Transaction timeout.
* @param txSize Expected transaction size.
+ * @param sys System flag.
* @return Transaction.
*/
private GridCacheTx txStart0(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation,
- long timeout, int txSize) {
+ long timeout, int txSize, boolean sys) {
GridTransactionsConfiguration cfg = cctx.gridConfig().getTransactionsConfiguration();
if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
@@ -102,6 +124,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
tx = cctx.tm().newTx(
false,
false,
+ sys,
concurrency,
isolation,
timeout,
@@ -128,7 +151,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
throw new IllegalArgumentException("Failed to find cache with given name (cache is not configured): " +
cacheName);
- return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize);
+ return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize, false);
}
/** {@inheritDoc} */
@@ -142,7 +165,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
Object grpLockKey = cache.context().affinity().partitionAffinityKey(partId);
- return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize);
+ return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize, false);
}
/**
@@ -155,13 +178,14 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
* should be a unique partition-specific key.
* @param timeout Tx timeout.
* @param txSize Expected transaction size.
+ * @param sys System flag.
* @return Started transaction.
* @throws IllegalStateException If other transaction was already started.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("unchecked")
private GridCacheTx txStartGroupLock(GridCacheContext ctx, Object grpLockKey, GridCacheTxConcurrency concurrency,
- GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize)
+ GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize, boolean sys)
throws IllegalStateException, IgniteCheckedException {
GridCacheTx tx = cctx.tm().userTx();
@@ -172,6 +196,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
GridCacheTxLocalAdapter<K, V> tx0 = cctx.tm().newTx(
false,
false,
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
index 6044900..e1e24bb 100644
--- a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
+++ b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
@@ -64,16 +64,17 @@ public class GridCacheJtaManager<K, V> extends GridCacheJtaManagerAdapter<K, V>
.getTransactionsConfiguration();
tx = cctx.tm().newTx(
- false,
- false,
+ /*implicit*/false,
+ /*implicit single*/false,
+ /*system*/false,
tCfg.getDefaultTxConcurrency(),
tCfg.getDefaultTxIsolation(),
tCfg.getDefaultTxTimeout(),
- false,
- true,
- 0,
- /** group lock keys */null,
- /** partition lock */false
+ /*invalidate*/false,
+ /*store enabled*/true,
+ /*tx size*/0,
+ /*group lock keys*/null,
+ /*partition lock*/false
);
}