You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/05/13 11:31:56 UTC
[ignite] branch master updated: IGNITE-12969 TxRecovery discovery
listener should implement HighPriorityListener (#7779)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4fd6fdf IGNITE-12969 TxRecovery discovery listener should implement HighPriorityListener (#7779)
4fd6fdf is described below
commit 4fd6fdff4972534e11e244fc0049e275130966ae
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Wed May 13 14:31:33 2020 +0300
IGNITE-12969 TxRecovery discovery listener should implement HighPriorityListener (#7779)
---
.../cache/distributed/dht/NearTxResultHandler.java | 4 +++
.../cache/transactions/IgniteTxManager.java | 32 ++++++++++++++++------
2 files changed, 28 insertions(+), 8 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
index 1332b09..8510da4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
@@ -108,6 +109,9 @@ public final class NearTxResultHandler implements CI1<IgniteInternalFuture<GridC
try {
cctx.io().send(nearNodeId, res, cctx.ioPolicy());
}
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft(nearNodeId);
+ }
catch (IgniteCheckedException e) {
U.error(fut.log, "Failed to send near enlist response (will rollback transaction) [" +
"tx=" + CU.txString(tx) +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 823067c..f839ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.managers.systemview.walker.TransactionViewWalker;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
@@ -331,20 +332,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
};
+ cctx.gridEvents().addDiscoveryEventListener(new TransactionRecoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT);
+
cctx.gridEvents().addDiscoveryEventListener(
new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
if (evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) {
UUID nodeId = evt.eventNode().id();
- IgniteInternalFuture<?> recInitFut = cctx.kernalContext().closure().runLocalSafe(
- new TxRecoveryInitRunnable(evt.eventNode(), cctx.coordinators().currentCoordinator()));
-
- recInitFut.listen(future -> {
- if (future.error() != null)
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, future.error()));
- });
-
for (TxDeadlockFuture fut : deadlockDetectFuts.values())
fut.onNodeLeft(nodeId);
@@ -3389,4 +3384,25 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return S.toString(TxTimeoutOnPartitionMapExchangeChangeFuture.class, this);
}
}
+
+ /**
+ * Starts the tx recovery.
+ */
+ private class TransactionRecoveryListener implements DiscoveryEventListener, HighPriorityListener {
+ /** {@inheritDoc} */
+ @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+ IgniteInternalFuture<?> recInitFut = cctx.kernalContext().closure().runLocalSafe(
+ new TxRecoveryInitRunnable(evt.eventNode(), cctx.coordinators().currentCoordinator()));
+
+ recInitFut.listen(future -> {
+ if (future.error() != null)
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, future.error()));
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public int order() {
+ return 0;
+ }
+ }
}