You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2023/08/04 09:38:33 UTC

[ignite] branch master updated: IGNITE-20123 IgniteTxHandler initial cleanup (#10869)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 853b3da0529 IGNITE-20123 IgniteTxHandler initial cleanup (#10869)
853b3da0529 is described below

commit 853b3da05299255269c6d6e6aaf7840594ed8c47
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Fri Aug 4 12:38:25 2023 +0300

    IGNITE-20123 IgniteTxHandler initial cleanup (#10869)
---
 .../cache/transactions/IgniteTxHandler.java        | 299 ++++++++-------------
 1 file changed, 110 insertions(+), 189 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index c503957b838..b3d93189a9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -91,9 +91,6 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -136,7 +133,7 @@ import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
  */
 public class IgniteTxHandler {
     /** Logger. */
-    private IgniteLogger log;
+    private final IgniteLogger log;
 
     /** */
     private final IgniteLogger txPrepareMsgLog;
@@ -148,7 +145,7 @@ public class IgniteTxHandler {
     private final IgniteLogger txRecoveryMsgLog;
 
     /** Shared cache context. */
-    private GridCacheSharedContext<?, ?> ctx;
+    private final GridCacheSharedContext<?, ?> ctx;
 
     /**
      * @param nearNodeId Sender node ID.
@@ -182,16 +179,13 @@ public class IgniteTxHandler {
      * @param nearNode Sender node.
      * @param req Request.
      */
-    private IgniteInternalFuture<GridNearTxPrepareResponse> processNearTxPrepareRequest0(
-        ClusterNode nearNode,
-        GridNearTxPrepareRequest req
-    ) {
+    private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
         IgniteInternalFuture<GridNearTxPrepareResponse> fut;
 
         if (req.firstClientRequest() && req.allowWaitTopologyFuture()) {
             for (;;) {
                 if (waitForExchangeFuture(nearNode, req))
-                    return new GridFinishedFuture<>();
+                    return;
 
                 fut = prepareNearTx(nearNode, req);
 
@@ -204,8 +198,6 @@ public class IgniteTxHandler {
 
         assert req.txState() != null || fut == null || fut.error() != null ||
             (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
-
-        return fut;
     }
 
     /**
@@ -220,68 +212,38 @@ public class IgniteTxHandler {
         txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
         txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
 
-        ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+            processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg));
 
-        ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+            processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg));
 
-        ctx.io().addCacheHandler(GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridNearTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+            processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg));
 
-        ctx.io().addCacheHandler(GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridNearTxFinishResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+            processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg));
 
-        ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+            processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg));
 
-        ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+            processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg));
 
-        ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+            processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg));
 
-        ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, (UUID nodeId, GridCacheMessage msg) ->
+            processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg));
 
-        ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
-            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
-                processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
-            }
-        });
+        ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, (UUID nodeId, GridCacheMessage msg) ->
+            processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg));
 
-        ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class,
-            (CI2<UUID, GridCacheTxRecoveryRequest>)this::processCheckPreparedTxRequest);
+        ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class, this::processCheckPreparedTxRequest);
 
-        ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class,
-            (CI2<UUID, GridCacheTxRecoveryResponse>)this::processCheckPreparedTxResponse);
+        ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class, this::processCheckPreparedTxResponse);
 
-        ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class,
-            (CI2<UUID, IncrementalSnapshotAwareMessage>)this::processIncrementalSnapshotAwareMessage);
+        ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class, this::processIncrementalSnapshotAwareMessage);
     }
 
     /** */
@@ -373,31 +335,29 @@ public class IgniteTxHandler {
 
         IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req);
 
-        return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() {
-            @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) {
-                try {
-                    return f.get();
-                }
-                catch (Exception e) {
-                    locTx.setRollbackOnly(); // Just in case.
+        return fut.chain((IgniteInternalFuture<GridNearTxPrepareResponse> f) -> {
+            try {
+                return f.get();
+            }
+            catch (Exception e) {
+                locTx.setRollbackOnly(); // Just in case.
 
-                    if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
-                        !X.hasCause(e, IgniteFutureCancelledException.class))
-                        U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
-
-                    return new GridNearTxPrepareResponse(
-                        req.partition(),
-                        req.version(),
-                        req.futureId(),
-                        req.miniId(),
-                        req.version(),
-                        req.version(),
-                        null,
-                        e,
-                        null,
-                        req.onePhaseCommit(),
-                        req.deployInfo() != null);
-                }
+                if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+                    !X.hasCause(e, IgniteFutureCancelledException.class))
+                    U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
+
+                return new GridNearTxPrepareResponse(
+                    req.partition(),
+                    req.version(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.version(),
+                    req.version(),
+                    null,
+                    e,
+                    null,
+                    req.onePhaseCommit(),
+                    req.deployInfo() != null);
             }
         });
     }
@@ -658,18 +618,16 @@ public class IgniteTxHandler {
 
             final GridDhtTxLocal tx0 = tx;
 
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> txFut) {
-                    try {
-                        txFut.get();
-                    }
-                    catch (IgniteCheckedException e) {
-                        tx0.setRollbackOnly(); // Just in case.
+            fut.listen((IgniteInternalFuture<?> txFut) -> {
+                try {
+                    txFut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    tx0.setRollbackOnly(); // Just in case.
 
-                        if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
-                            !X.hasCause(e, IgniteFutureCancelledException.class) && !ctx.kernalContext().isStopping())
-                            U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
-                    }
+                    if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+                        !X.hasCause(e, IgniteFutureCancelledException.class) && !ctx.kernalContext().isStopping())
+                        U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
                 }
             });
 
@@ -806,7 +764,7 @@ public class IgniteTxHandler {
                     nodeId + ']');
 
             GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
-                .<IgniteInternalTx>versionedFuture(res.version(), res.futureId());
+                .versionedFuture(res.version(), res.futureId());
 
             if (fut == null) {
                 U.warn(log, "Failed to find future for near prepare response [txId=" + res.version() +
@@ -836,7 +794,7 @@ public class IgniteTxHandler {
             if (txFinishMsgLog.isDebugEnabled())
                 txFinishMsgLog.debug("Received near finish response [txId=" + res.xid() + ", node=" + nodeId + ']');
 
-            GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
+            GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId());
 
             if (fut == null) {
                 if (txFinishMsgLog.isDebugEnabled()) {
@@ -897,7 +855,7 @@ public class IgniteTxHandler {
             assert res != null;
 
             if (res.checkCommitted()) {
-                GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
+                GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId());
 
                 if (fut == null) {
                     if (txFinishMsgLog.isDebugEnabled()) {
@@ -918,7 +876,7 @@ public class IgniteTxHandler {
                 fut.onResult(nodeId, res);
             }
             else {
-                GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
+                GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().future(res.futureId());
 
                 if (fut == null) {
                     if (txFinishMsgLog.isDebugEnabled()) {
@@ -944,12 +902,8 @@ public class IgniteTxHandler {
     /**
      * @param nodeId Node ID.
      * @param req Request.
-     * @return Future.
      */
-    @Nullable private IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(
-        UUID nodeId,
-        GridNearTxFinishRequest req
-    ) {
+    private void processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) {
         try (TraceSurroundings ignored =
                  MTC.support(ctx.kernalContext().tracing().create(TX_NEAR_FINISH_REQ, MTC.span()))) {
             if (txFinishMsgLog.isDebugEnabled())
@@ -961,8 +915,6 @@ public class IgniteTxHandler {
             assert req.txState() != null || fut == null || fut.error() != null ||
                 (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null) :
                 "[req=" + req + ", fut=" + fut + "]";
-
-            return fut;
         }
     }
 
@@ -988,7 +940,7 @@ public class IgniteTxHandler {
 
         // Transaction on local cache only.
         if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
-            return new GridFinishedFuture<IgniteInternalTx>(locTx);
+            return new GridFinishedFuture<>(locTx);
 
         if (log.isDebugEnabled())
             log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]");
@@ -1325,37 +1277,15 @@ public class IgniteTxHandler {
             }
 
             if (req.onePhaseCommit()) {
-                IgniteInternalFuture completeFut;
-
-                IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
-                    null : dhtTx.done() ? null : dhtTx.finishFuture();
-
-                final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
-                    null : nearTx.done() ? null : nearTx.finishFuture();
-
-                if (dhtFin != null && nearFin != null) {
-                    GridCompoundFuture fut = new GridCompoundFuture();
-
-                    fut.add(dhtFin);
-                    fut.add(nearFin);
-
-                    fut.markInitialized();
-
-                    completeFut = fut;
-                }
-                else
-                    completeFut = dhtFin != null ? dhtFin : nearFin;
+                IgniteInternalFuture<IgniteInternalTx> completeFut = completeFuture(dhtTx, nearTx);
 
                 if (completeFut != null) {
                     final GridDhtTxPrepareResponse res0 = res;
                     final GridDhtTxRemote dhtTx0 = dhtTx;
                     final GridNearTxRemote nearTx0 = nearTx;
 
-                    completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
-                            sendReply(nodeId, req, res0, dhtTx0, nearTx0);
-                        }
-                    });
+                    completeFut.listen((IgniteInternalFuture<IgniteInternalTx> fut) ->
+                        sendReply(nodeId, req, res0, dhtTx0, nearTx0));
                 }
                 else
                     sendReply(nodeId, req, res, dhtTx, nearTx);
@@ -1368,6 +1298,31 @@ public class IgniteTxHandler {
         }
     }
 
+    /**
+     * @param dhtTx Dht tx.
+     * @param nearTx Near tx.
+     */
+    private IgniteInternalFuture<IgniteInternalTx> completeFuture(GridDhtTxRemote dhtTx, GridNearTxRemote nearTx) {
+        IgniteInternalFuture<IgniteInternalTx> dhtFin =
+            dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+        final IgniteInternalFuture<IgniteInternalTx> nearFin =
+            nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture();
+
+        if (dhtFin != null && nearFin != null) {
+            GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new GridCompoundFuture<>();
+
+            fut.add(dhtFin);
+            fut.add(nearFin);
+
+            fut.markInitialized();
+
+            return fut;
+        }
+        else
+            return dhtFin != null ? dhtFin : nearFin;
+    }
+
     /**
      * @param nodeId Node ID.
      * @param req Request.
@@ -1391,7 +1346,6 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param req Request.
      */
-    @SuppressWarnings({"unchecked"})
     private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
         try (TraceSurroundings ignored =
                  MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_FINISH_REQ, MTC.span()))) {
@@ -1406,11 +1360,7 @@ public class IgniteTxHandler {
                 else {
                     IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
 
-                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> fut) {
-                            sendReply(nodeId, req, true, null);
-                        }
-                    });
+                    fut.listen((IgniteInternalFuture<?> f) -> sendReply(nodeId, req, true, null));
                 }
 
                 return;
@@ -1435,7 +1385,7 @@ public class IgniteTxHandler {
                 ctx.tm().addCommittedTx(null, req.version(), null);
 
             if (dhtTx != null)
-                finish(nodeId, dhtTx, req);
+                finish(dhtTx, req);
             else {
                 try {
                     applyPartitionsUpdatesCounters(req.updateCounters(), !req.commit(), false);
@@ -1446,36 +1396,14 @@ public class IgniteTxHandler {
             }
 
             if (nearTx != null)
-                finish(nodeId, nearTx, req);
+                finish(nearTx, req);
 
             if (req.replyRequired()) {
-                IgniteInternalFuture completeFut;
-
-                IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
-                    null : dhtTx.done() ? null : dhtTx.finishFuture();
-
-                final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
-                    null : nearTx.done() ? null : nearTx.finishFuture();
-
-                if (dhtFin != null && nearFin != null) {
-                    GridCompoundFuture fut = new GridCompoundFuture();
-
-                    fut.add(dhtFin);
-                    fut.add(nearFin);
-
-                    fut.markInitialized();
-
-                    completeFut = fut;
-                }
-                else
-                    completeFut = dhtFin != null ? dhtFin : nearFin;
+                IgniteInternalFuture<IgniteInternalTx> completeFut = completeFuture(dhtTx, nearTx);
 
                 if (completeFut != null) {
-                    completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
-                            sendReply(nodeId, req, true, nearTxId);
-                        }
-                    });
+                    completeFut.listen((IgniteInternalFuture<IgniteInternalTx> fut) ->
+                        sendReply(nodeId, req, true, nearTxId));
                 }
                 else
                     sendReply(nodeId, req, true, nearTxId);
@@ -1488,15 +1416,10 @@ public class IgniteTxHandler {
     }
 
     /**
-     * @param nodeId Node ID.
      * @param tx Transaction.
      * @param req Request.
      */
-    protected void finish(
-        UUID nodeId,
-        IgniteTxRemoteEx tx,
-        GridDhtTxFinishRequest req
-    ) {
+    protected void finish(IgniteTxRemoteEx tx, GridDhtTxFinishRequest req) {
         assert tx != null;
 
         req.txState(tx.txState());
@@ -2209,7 +2132,7 @@ public class IgniteTxHandler {
             boolean prepared;
 
             try {
-                prepared = fut == null ? true : fut.get();
+                prepared = fut == null || fut.get();
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
@@ -2220,21 +2143,19 @@ public class IgniteTxHandler {
             sendCheckPreparedResponse(nodeId, req, prepared);
         }
         else {
-            fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                @Override public void apply(IgniteInternalFuture<Boolean> fut) {
-                    boolean prepared;
-
-                    try {
-                        prepared = fut.get();
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
+            fut.listen((IgniteInternalFuture<Boolean> f) -> {
+                boolean prepared;
 
-                        prepared = false;
-                    }
+                try {
+                    prepared = fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
 
-                    sendCheckPreparedResponse(nodeId, req, prepared);
+                    prepared = false;
                 }
+
+                sendCheckPreparedResponse(nodeId, req, prepared);
             });
         }
     }
@@ -2425,14 +2346,14 @@ public class IgniteTxHandler {
         AffinityTopologyVersion top = tx.topologyVersionSnapshot();
 
         for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
-            GridDhtPartitionTopology topology = ctx.cacheContext(partCntrs.cacheId()).topology();
+            GridDhtPartitionTopology partTop = ctx.cacheContext(partCntrs.cacheId()).topology();
 
             PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
 
             for (int i = 0; i < partCntrs.size(); i++) {
                 int part = partCntrs.partition(i);
 
-                if (topology.nodes(part, top).indexOf(node) > 0)
+                if (partTop.nodes(part, top).indexOf(node) > 0)
                     resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
             }