You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 16:35:37 UTC
[45/50] [abbrv] ignite git commit: Changed tx mini future ids from
IgniteUuid to int, removed some legacy code from tx processing.
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 90a68ad..56a7fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -76,13 +76,11 @@ import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
-import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -114,7 +112,7 @@ public class IgniteTxHandler {
* @param req Request.
* @return Prepare future.
*/
- public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+ private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
", node=" + nearNodeId + ']');
@@ -272,6 +270,7 @@ public class IgniteTxHandler {
U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
return new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -287,6 +286,27 @@ public class IgniteTxHandler {
}
/**
+ * @param entries Entries.
+ * @return First entry.
+ * @throws IgniteCheckedException If failed.
+ */
+ private IgniteTxEntry unmarshal(@Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
+ if (entries == null)
+ return null;
+
+ IgniteTxEntry firstEntry = null;
+
+ for (IgniteTxEntry e : entries) {
+ e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+ if (firstEntry == null)
+ firstEntry = e;
+ }
+
+ return firstEntry;
+ }
+
+ /**
* Prepares near transaction.
*
* @param nearNodeId Near node ID that initiated transaction.
@@ -309,15 +329,13 @@ public class IgniteTxHandler {
return null;
}
- IgniteTxEntry firstEntry = null;
+ IgniteTxEntry firstEntry;
try {
- for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
- e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+ IgniteTxEntry firstWrite = unmarshal(req.writes());
+ IgniteTxEntry firstRead = unmarshal(req.reads());
- if (firstEntry == null)
- firstEntry = e;
- }
+ firstEntry = firstWrite != null ? firstWrite : firstRead;
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
@@ -364,6 +382,7 @@ public class IgniteTxHandler {
}
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -449,17 +468,16 @@ public class IgniteTxHandler {
tx.transactionNodes(req.transactionNodes());
- // Set near on originating node flag only if the sender node has new version.
- if (req.near() && FINISH_NEAR_ONE_PHASE_SINCE.compareTo(nearNode.version()) <= 0)
+ if (req.near())
tx.nearOnOriginatingNode(true);
if (req.onePhaseCommit()) {
- assert req.last();
+ assert req.last() : req;
tx.onePhaseCommit(true);
}
- if (req.returnValue())
+ if (req.needReturnValue())
tx.needReturnValue(true);
IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
@@ -778,8 +796,13 @@ public class IgniteTxHandler {
", commit=" + req.commit() + ']');
// Always send finish response.
- GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(),
- req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+ GridCacheMessage res = new GridNearTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.threadId(),
+ req.futureId(),
+ req.miniId(),
+ new IgniteCheckedException("Transaction has been already completed."));
try {
ctx.io().send(nodeId, res, req.policy());
@@ -819,14 +842,9 @@ public class IgniteTxHandler {
try {
assert tx != null : "Transaction is null for near finish request [nodeId=" +
nodeId + ", req=" + req + "]";
+ assert req.syncMode() != null : req;
- if (req.syncMode() == null) {
- boolean sync = req.commit() ? req.syncCommit() : req.syncRollback();
-
- tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC);
- }
- else
- tx.syncMode(req.syncMode());
+ tx.syncMode(req.syncMode());
if (req.commit()) {
tx.storeEnabled(req.storeEnabled());
@@ -920,7 +938,7 @@ public class IgniteTxHandler {
* @param nodeId Sender node ID.
* @param req Request.
*/
- protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
+ private void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
@@ -938,7 +956,12 @@ public class IgniteTxHandler {
GridDhtTxPrepareResponse res;
try {
- res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), req.deployInfo() != null);
+ res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.deployInfo() != null);
// Start near transaction first.
nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
@@ -990,7 +1013,12 @@ public class IgniteTxHandler {
if (nearTx != null)
nearTx.rollback();
- res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), e,
+ res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ e,
req.deployInfo() != null);
}
@@ -1041,7 +1069,7 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param req Request.
*/
- protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+ private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
final GridDhtTxOnePhaseCommitAckRequest req) {
assert nodeId != null;
assert req != null;
@@ -1058,14 +1086,14 @@ public class IgniteTxHandler {
* @param req Request.
*/
@SuppressWarnings({"unchecked"})
- protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
+ private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
assert nodeId != null;
assert req != null;
if (req.checkCommitted()) {
boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
- if (!committed || !req.syncCommit())
+ if (!committed || req.syncMode() != FULL_SYNC)
sendReply(nodeId, req, committed, null);
else {
IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
@@ -1301,9 +1329,13 @@ public class IgniteTxHandler {
* @param committed {@code True} if transaction committed on this node.
* @param nearTxId Near tx version.
*/
- protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
+ protected final void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
if (req.replyRequired() || req.checkCommitted()) {
- GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId());
if (req.checkCommitted()) {
res.checkCommitted(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index bd806aa..b1a4003 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -37,6 +37,7 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -3277,7 +3278,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
}
catch (IgniteCheckedException | RuntimeException e) {
- rollbackAsync();
+ if (!(e instanceof NodeStoppingException))
+ rollbackAsync();
throw e;
}