You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2023/08/04 09:38:33 UTC
[ignite] branch master updated: IGNITE-20123 IgniteTxHandler initial cleanup (#10869)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 853b3da0529 IGNITE-20123 IgniteTxHandler initial cleanup (#10869)
853b3da0529 is described below
commit 853b3da05299255269c6d6e6aaf7840594ed8c47
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Fri Aug 4 12:38:25 2023 +0300
IGNITE-20123 IgniteTxHandler initial cleanup (#10869)
---
.../cache/transactions/IgniteTxHandler.java | 299 ++++++++-------------
1 file changed, 110 insertions(+), 189 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index c503957b838..b3d93189a9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -91,9 +91,6 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -136,7 +133,7 @@ import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
*/
public class IgniteTxHandler {
/** Logger. */
- private IgniteLogger log;
+ private final IgniteLogger log;
/** */
private final IgniteLogger txPrepareMsgLog;
@@ -148,7 +145,7 @@ public class IgniteTxHandler {
private final IgniteLogger txRecoveryMsgLog;
/** Shared cache context. */
- private GridCacheSharedContext<?, ?> ctx;
+ private final GridCacheSharedContext<?, ?> ctx;
/**
* @param nearNodeId Sender node ID.
@@ -182,16 +179,13 @@ public class IgniteTxHandler {
* @param nearNode Sender node.
* @param req Request.
*/
- private IgniteInternalFuture<GridNearTxPrepareResponse> processNearTxPrepareRequest0(
- ClusterNode nearNode,
- GridNearTxPrepareRequest req
- ) {
+ private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
IgniteInternalFuture<GridNearTxPrepareResponse> fut;
if (req.firstClientRequest() && req.allowWaitTopologyFuture()) {
for (;;) {
if (waitForExchangeFuture(nearNode, req))
- return new GridFinishedFuture<>();
+ return;
fut = prepareNearTx(nearNode, req);
@@ -204,8 +198,6 @@ public class IgniteTxHandler {
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
-
- return fut;
}
/**
@@ -220,68 +212,38 @@ public class IgniteTxHandler {
txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
- ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg);
- }
- });
+ ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+ processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg));
- ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg);
- }
- });
+ ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+ processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg));
- ctx.io().addCacheHandler(GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
- }
- });
+ ctx.io().addCacheHandler(GridNearTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+ processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg));
- ctx.io().addCacheHandler(GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg);
- }
- });
+ ctx.io().addCacheHandler(GridNearTxFinishResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+ processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg));
- ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg);
- }
- });
+ ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+ processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg));
- ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg);
- }
- });
+ ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+ processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg));
- ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
- }
- });
+ ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+ processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg));
- ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
- }
- });
+ ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+ processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg));
- ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
- @Override public void apply(UUID nodeId, GridCacheMessage msg) {
- processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
- }
- });
+ ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+ processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg));
- ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class,
- (CI2<UUID, GridCacheTxRecoveryRequest>)this::processCheckPreparedTxRequest);
+ ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class, this::processCheckPreparedTxRequest);
- ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class,
- (CI2<UUID, GridCacheTxRecoveryResponse>)this::processCheckPreparedTxResponse);
+ ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class, this::processCheckPreparedTxResponse);
- ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class,
- (CI2<UUID, IncrementalSnapshotAwareMessage>)this::processIncrementalSnapshotAwareMessage);
+ ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class, this::processIncrementalSnapshotAwareMessage);
}
/** */
@@ -373,31 +335,29 @@ public class IgniteTxHandler {
IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req);
- return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() {
- @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) {
- try {
- return f.get();
- }
- catch (Exception e) {
- locTx.setRollbackOnly(); // Just in case.
+ return fut.chain((IgniteInternalFuture<GridNearTxPrepareResponse> f) -> {
+ try {
+ return f.get();
+ }
+ catch (Exception e) {
+ locTx.setRollbackOnly(); // Just in case.
- if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
- !X.hasCause(e, IgniteFutureCancelledException.class))
- U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
-
- return new GridNearTxPrepareResponse(
- req.partition(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.version(),
- null,
- e,
- null,
- req.onePhaseCommit(),
- req.deployInfo() != null);
- }
+ if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+ !X.hasCause(e, IgniteFutureCancelledException.class))
+ U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
+
+ return new GridNearTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ e,
+ null,
+ req.onePhaseCommit(),
+ req.deployInfo() != null);
}
});
}
@@ -658,18 +618,16 @@ public class IgniteTxHandler {
final GridDhtTxLocal tx0 = tx;
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> txFut) {
- try {
- txFut.get();
- }
- catch (IgniteCheckedException e) {
- tx0.setRollbackOnly(); // Just in case.
+ fut.listen((IgniteInternalFuture<?> txFut) -> {
+ try {
+ txFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ tx0.setRollbackOnly(); // Just in case.
- if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
- !X.hasCause(e, IgniteFutureCancelledException.class) && !ctx.kernalContext().isStopping())
- U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
- }
+ if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+ !X.hasCause(e, IgniteFutureCancelledException.class) && !ctx.kernalContext().isStopping())
+ U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
}
});
@@ -806,7 +764,7 @@ public class IgniteTxHandler {
nodeId + ']');
GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
- .<IgniteInternalTx>versionedFuture(res.version(), res.futureId());
+ .versionedFuture(res.version(), res.futureId());
if (fut == null) {
U.warn(log, "Failed to find future for near prepare response [txId=" + res.version() +
@@ -836,7 +794,7 @@ public class IgniteTxHandler {
if (txFinishMsgLog.isDebugEnabled())
txFinishMsgLog.debug("Received near finish response [txId=" + res.xid() + ", node=" + nodeId + ']');
- GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
+ GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (txFinishMsgLog.isDebugEnabled()) {
@@ -897,7 +855,7 @@ public class IgniteTxHandler {
assert res != null;
if (res.checkCommitted()) {
- GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
+ GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (txFinishMsgLog.isDebugEnabled()) {
@@ -918,7 +876,7 @@ public class IgniteTxHandler {
fut.onResult(nodeId, res);
}
else {
- GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
+ GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (txFinishMsgLog.isDebugEnabled()) {
@@ -944,12 +902,8 @@ public class IgniteTxHandler {
/**
* @param nodeId Node ID.
* @param req Request.
- * @return Future.
*/
- @Nullable private IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(
- UUID nodeId,
- GridNearTxFinishRequest req
- ) {
+ private void processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_NEAR_FINISH_REQ, MTC.span()))) {
if (txFinishMsgLog.isDebugEnabled())
@@ -961,8 +915,6 @@ public class IgniteTxHandler {
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null) :
"[req=" + req + ", fut=" + fut + "]";
-
- return fut;
}
}
@@ -988,7 +940,7 @@ public class IgniteTxHandler {
// Transaction on local cache only.
if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
- return new GridFinishedFuture<IgniteInternalTx>(locTx);
+ return new GridFinishedFuture<>(locTx);
if (log.isDebugEnabled())
log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]");
@@ -1325,37 +1277,15 @@ public class IgniteTxHandler {
}
if (req.onePhaseCommit()) {
- IgniteInternalFuture completeFut;
-
- IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
- null : dhtTx.done() ? null : dhtTx.finishFuture();
-
- final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
- null : nearTx.done() ? null : nearTx.finishFuture();
-
- if (dhtFin != null && nearFin != null) {
- GridCompoundFuture fut = new GridCompoundFuture();
-
- fut.add(dhtFin);
- fut.add(nearFin);
-
- fut.markInitialized();
-
- completeFut = fut;
- }
- else
- completeFut = dhtFin != null ? dhtFin : nearFin;
+ IgniteInternalFuture<IgniteInternalTx> completeFut = completeFuture(dhtTx, nearTx);
if (completeFut != null) {
final GridDhtTxPrepareResponse res0 = res;
final GridDhtTxRemote dhtTx0 = dhtTx;
final GridNearTxRemote nearTx0 = nearTx;
- completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
- sendReply(nodeId, req, res0, dhtTx0, nearTx0);
- }
- });
+ completeFut.listen((IgniteInternalFuture<IgniteInternalTx> fut) ->
+ sendReply(nodeId, req, res0, dhtTx0, nearTx0));
}
else
sendReply(nodeId, req, res, dhtTx, nearTx);
@@ -1368,6 +1298,31 @@ public class IgniteTxHandler {
}
}
+ /**
+ * @param dhtTx Dht tx.
+ * @param nearTx Near tx.
+ */
+ private IgniteInternalFuture<IgniteInternalTx> completeFuture(GridDhtTxRemote dhtTx, GridNearTxRemote nearTx) {
+ IgniteInternalFuture<IgniteInternalTx> dhtFin =
+ dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+ final IgniteInternalFuture<IgniteInternalTx> nearFin =
+ nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture();
+
+ if (dhtFin != null && nearFin != null) {
+ GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new GridCompoundFuture<>();
+
+ fut.add(dhtFin);
+ fut.add(nearFin);
+
+ fut.markInitialized();
+
+ return fut;
+ }
+ else
+ return dhtFin != null ? dhtFin : nearFin;
+ }
+
/**
* @param nodeId Node ID.
* @param req Request.
@@ -1391,7 +1346,6 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param req Request.
*/
- @SuppressWarnings({"unchecked"})
private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_FINISH_REQ, MTC.span()))) {
@@ -1406,11 +1360,7 @@ public class IgniteTxHandler {
else {
IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- sendReply(nodeId, req, true, null);
- }
- });
+ fut.listen((IgniteInternalFuture<?> f) -> sendReply(nodeId, req, true, null));
}
return;
@@ -1435,7 +1385,7 @@ public class IgniteTxHandler {
ctx.tm().addCommittedTx(null, req.version(), null);
if (dhtTx != null)
- finish(nodeId, dhtTx, req);
+ finish(dhtTx, req);
else {
try {
applyPartitionsUpdatesCounters(req.updateCounters(), !req.commit(), false);
@@ -1446,36 +1396,14 @@ public class IgniteTxHandler {
}
if (nearTx != null)
- finish(nodeId, nearTx, req);
+ finish(nearTx, req);
if (req.replyRequired()) {
- IgniteInternalFuture completeFut;
-
- IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
- null : dhtTx.done() ? null : dhtTx.finishFuture();
-
- final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
- null : nearTx.done() ? null : nearTx.finishFuture();
-
- if (dhtFin != null && nearFin != null) {
- GridCompoundFuture fut = new GridCompoundFuture();
-
- fut.add(dhtFin);
- fut.add(nearFin);
-
- fut.markInitialized();
-
- completeFut = fut;
- }
- else
- completeFut = dhtFin != null ? dhtFin : nearFin;
+ IgniteInternalFuture<IgniteInternalTx> completeFut = completeFuture(dhtTx, nearTx);
if (completeFut != null) {
- completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
- sendReply(nodeId, req, true, nearTxId);
- }
- });
+ completeFut.listen((IgniteInternalFuture<IgniteInternalTx> fut) ->
+ sendReply(nodeId, req, true, nearTxId));
}
else
sendReply(nodeId, req, true, nearTxId);
@@ -1488,15 +1416,10 @@ public class IgniteTxHandler {
}
/**
- * @param nodeId Node ID.
* @param tx Transaction.
* @param req Request.
*/
- protected void finish(
- UUID nodeId,
- IgniteTxRemoteEx tx,
- GridDhtTxFinishRequest req
- ) {
+ protected void finish(IgniteTxRemoteEx tx, GridDhtTxFinishRequest req) {
assert tx != null;
req.txState(tx.txState());
@@ -2209,7 +2132,7 @@ public class IgniteTxHandler {
boolean prepared;
try {
- prepared = fut == null ? true : fut.get();
+ prepared = fut == null || fut.get();
}
catch (IgniteCheckedException e) {
U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
@@ -2220,21 +2143,19 @@ public class IgniteTxHandler {
sendCheckPreparedResponse(nodeId, req, prepared);
}
else {
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut) {
- boolean prepared;
-
- try {
- prepared = fut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
+ fut.listen((IgniteInternalFuture<Boolean> f) -> {
+ boolean prepared;
- prepared = false;
- }
+ try {
+ prepared = fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
- sendCheckPreparedResponse(nodeId, req, prepared);
+ prepared = false;
}
+
+ sendCheckPreparedResponse(nodeId, req, prepared);
});
}
}
@@ -2425,14 +2346,14 @@ public class IgniteTxHandler {
AffinityTopologyVersion top = tx.topologyVersionSnapshot();
for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
- GridDhtPartitionTopology topology = ctx.cacheContext(partCntrs.cacheId()).topology();
+ GridDhtPartitionTopology partTop = ctx.cacheContext(partCntrs.cacheId()).topology();
PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
for (int i = 0; i < partCntrs.size(); i++) {
int part = partCntrs.partition(i);
- if (topology.nodes(part, top).indexOf(node) > 0)
+ if (partTop.nodes(part, top).indexOf(node) > 0)
resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
}