You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/05/13 11:31:56 UTC

[ignite] branch master updated: IGNITE-12969 TxRecovery discovery listener should implement HighPriorityListener (#7779)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fd6fdf  IGNITE-12969 TxRecovery discovery listener should implement HighPriorityListener	 (#7779)
4fd6fdf is described below

commit 4fd6fdff4972534e11e244fc0049e275130966ae
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Wed May 13 14:31:33 2020 +0300

    IGNITE-12969 TxRecovery discovery listener should implement HighPriorityListener	 (#7779)
---
 .../cache/distributed/dht/NearTxResultHandler.java |  4 +++
 .../cache/transactions/IgniteTxManager.java        | 32 ++++++++++++++++------
 2 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
index 1332b09..8510da4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
@@ -108,6 +109,9 @@ public final class NearTxResultHandler implements CI1<IgniteInternalFuture<GridC
         try {
             cctx.io().send(nearNodeId, res, cctx.ioPolicy());
         }
+        catch (ClusterTopologyCheckedException e) {
+            fut.onNodeLeft(nearNodeId);
+        }
         catch (IgniteCheckedException e) {
             U.error(fut.log, "Failed to send near enlist response (will rollback transaction) [" +
                 "tx=" + CU.txString(tx) +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 823067c..f839ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.managers.systemview.walker.TransactionViewWalker;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
@@ -331,20 +332,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             }
         };
 
+        cctx.gridEvents().addDiscoveryEventListener(new TransactionRecoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT);
+
         cctx.gridEvents().addDiscoveryEventListener(
             new DiscoveryEventListener() {
                 @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
                     if (evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) {
                         UUID nodeId = evt.eventNode().id();
 
-                        IgniteInternalFuture<?> recInitFut = cctx.kernalContext().closure().runLocalSafe(
-                            new TxRecoveryInitRunnable(evt.eventNode(), cctx.coordinators().currentCoordinator()));
-
-                        recInitFut.listen(future -> {
-                            if (future.error() != null)
-                                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, future.error()));
-                        });
-
                         for (TxDeadlockFuture fut : deadlockDetectFuts.values())
                             fut.onNodeLeft(nodeId);
 
@@ -3389,4 +3384,25 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             return S.toString(TxTimeoutOnPartitionMapExchangeChangeFuture.class, this);
         }
     }
+
+    /**
+     * Starts the tx recovery.
+     */
+    private class TransactionRecoveryListener implements DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+            IgniteInternalFuture<?> recInitFut = cctx.kernalContext().closure().runLocalSafe(
+                new TxRecoveryInitRunnable(evt.eventNode(), cctx.coordinators().currentCoordinator()));
+
+            recInitFut.listen(future -> {
+                if (future.error() != null)
+                    cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, future.error()));
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+    }
 }