You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2023/01/16 13:09:29 UTC

[GitHub] [ignite] timoninmaxim commented on a diff in pull request #10314: IGNITE-17029 Consistent Cut for incremental snapshot

timoninmaxim commented on code in PR #10314:
URL: https://github.com/apache/ignite/pull/10314#discussion_r1071226965


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java:
##########
@@ -218,73 +222,151 @@ public IgniteTxHandler(GridCacheSharedContext ctx) {
         txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
         txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
 
-        ctx.io().addCacheHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(0, GridCacheTxRecoveryRequest.class,
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridCacheTxRecoveryRequest.class,
             new CI2<UUID, GridCacheTxRecoveryRequest>() {
                 @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) {
                     processCheckPreparedTxRequest(nodeId, req);
                 }
             });
 
-        ctx.io().addCacheHandler(0, GridCacheTxRecoveryResponse.class,
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridCacheTxRecoveryResponse.class,
             new CI2<UUID, GridCacheTxRecoveryResponse>() {
                 @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) {
                     processCheckPreparedTxResponse(nodeId, res);
                 }
             });
+
+        ctx.io().addCacheHandler(TX_MSG_HND_ID, ConsistentCutAwareMessage.class,
+            new CI2<UUID, ConsistentCutAwareMessage>() {
+                @Override public void apply(UUID nodeId, ConsistentCutAwareMessage msg) {
+                    processConsistentCutAwareMessage(nodeId, msg);
+                }
+            });
+    }
+
+    /** */
+    private void processConsistentCutAwareMessage(UUID nodeId, ConsistentCutAwareMessage msg) {
+        ctx.snapshotMgr().handleConsistentCutId(msg.id());
+
+        setTransactionCutIdIfRequired(msg);
+
+        GridCacheMessage cacheMsg = msg.payload();
+
+        ctx.io()
+            .cacheHandler(TX_MSG_HND_ID, cacheMsg.getClass())
+            .apply(nodeId, cacheMsg);
+    }
+
+    /**
+     * Set received Consistent Cut ID to transaction if specified.
+     *
+     * @param msg Finish message signed with Consistent Cut ID.
+     */
+    private void setTransactionCutIdIfRequired(ConsistentCutAwareMessage msg) {
+        if (msg.txCutId() != null) {
+            IgniteInternalTx tx = findTransactionByMessage(msg.payload());
+
+            if (tx != null)

Review Comment:
   It could be for cases when `ctx.tm().tx(req,version())` returns null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org