You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/01/20 11:44:13 UTC

[ignite] branch master updated: IGNITE-12548 Fixed tx desync during recovery on near node left. - Fixes #7274.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b677b36c IGNITE-12548 Fixed tx desync during recovery on near node left. - Fixes #7274.
b677b36c is described below

commit b677b36cc484c683803e18b98ccb138dec2eedaf
Author: Aleksei Scherbakov <al...@gmail.com>
AuthorDate: Mon Jan 20 14:43:52 2020 +0300

    IGNITE-12548 Fixed tx desync during recovery on near node left. - Fixes #7274.
    
    Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
 .../managers/communication/GridIoManager.java      |  10 +-
 .../distributed/GridCacheTxRecoveryFuture.java     |  15 +-
 .../distributed/dht/GridDhtTxFinishFuture.java     |  38 ++-
 .../cache/transactions/IgniteInternalTx.java       |   5 +
 .../cache/transactions/IgniteTxAdapter.java        |   7 +-
 .../cache/transactions/IgniteTxHandler.java        |  34 ++-
 .../cache/transactions/IgniteTxLocalAdapter.java   |   7 -
 .../cache/transactions/IgniteTxManager.java        |  20 +-
 .../ignite/internal/util/StripedExecutor.java      |  10 +
 .../TxRecoveryWithConcurrentRollbackTest.java      | 340 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite7.java   |   3 +
 11 files changed, 443 insertions(+), 46 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9a763e1..9821edf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1398,14 +1398,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return;
         }
 
-        if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) {
-            ctx.getStripedExecutorService().execute(msg.partition(), c);
+        final int part = msg.partition(); // Store partition to avoid possible recalculation.
+
+        if (plc == GridIoPolicy.SYSTEM_POOL && part != GridIoMessage.STRIPE_DISABLED_PART) {
+            ctx.getStripedExecutorService().execute(part, c);
 
             return;
         }
 
-        if (plc == GridIoPolicy.DATA_STREAMER_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) {
-            ctx.getDataStreamerExecutorService().execute(msg.partition(), c);
+        if (plc == GridIoPolicy.DATA_STREAMER_POOL && part != GridIoMessage.STRIPE_DISABLED_PART) {
+            ctx.getDataStreamerExecutorService().execute(part, c);
 
             return;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 1abd525..3fc053d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -114,24 +114,15 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
 
         UUID locNodeId = cctx.localNodeId();
 
-        for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
-            if (!locNodeId.equals(e.getKey()) && !failedNodeIds.contains(e.getKey()) && !nodes.containsKey(e.getKey())) {
-                ClusterNode node = cctx.discovery().node(e.getKey());
-
-                if (node != null)
-                    nodes.put(node.id(), node);
-                else if (log.isInfoEnabled())
-                    log.info("Transaction node left (will ignore) " + e.getKey());
-            }
-
-            for (UUID nodeId : e.getValue()) {
+        for (Map.Entry<UUID, Collection<UUID>> entry : tx.transactionNodes().entrySet()) {
+            for (UUID nodeId : F.concat(false, entry.getKey(), entry.getValue())) {
                 if (!locNodeId.equals(nodeId) && !failedNodeIds.contains(nodeId) && !nodes.containsKey(nodeId)) {
                     ClusterNode node = cctx.discovery().node(nodeId);
 
                     if (node != null)
                         nodes.put(node.id(), node);
                     else if (log.isInfoEnabled())
-                        log.info("Transaction node left (will ignore) " + e.getKey());
+                        log.info("Transaction node left (will ignore) " + nodeId);
                 }
             }
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 00d1588..4152993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
@@ -37,7 +38,12 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxSerializationCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -253,6 +259,19 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 if (this.tx.syncMode() != PRIMARY_SYNC)
                     this.tx.sendFinishReply(finishErr);
 
+                if (!this.tx.txState().mvccEnabled() && !commit && shouldApplyCountersOnRollbackError(finishErr)) {
+                    TxCounters txCounters = this.tx.txCounters(false);
+
+                    if (txCounters != null) {
+                        try {
+                            cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCounters.updateCounters(), true, true);
+                        }
+                        catch (IgniteCheckedException e0) {
+                            throw new IgniteException(e0);
+                        }
+                    }
+                }
+
                 // Don't forget to clean up.
                 cctx.mvcc().removeFuture(futId);
 
@@ -264,6 +283,19 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
     }
 
     /**
+     * @param e Exception to check.
+     *
+     * @return {@code True} if counters must be applied.
+     */
+    private boolean shouldApplyCountersOnRollbackError(Throwable e) {
+        return e == null ||
+            e instanceof IgniteTxRollbackCheckedException ||
+            e instanceof IgniteTxTimeoutCheckedException ||
+            e instanceof IgniteTxOptimisticCheckedException ||
+            e instanceof IgniteTxSerializationCheckedException;
+    }
+
+    /**
      * @param f Future.
      * @return {@code True} if mini-future.
      */
@@ -450,7 +482,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 tx.system(),
                 tx.ioPolicy(),
                 tx.isSystemInvalidate(),
-                sync ? FULL_SYNC : tx.syncMode(),
+                sync || !commit ? FULL_SYNC : tx.syncMode(),
                 tx.completedBase(),
                 tx.committedVersions(),
                 tx.rolledbackVersions(),
@@ -483,8 +515,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                             ", node=" + n.id() + ']');
                     }
 
-                    if (sync)
-                        res = true;
+                    if (sync || !commit)
+                        res = true; // Force sync mode for rollback to prevent an issue with concurrent recovery.
                     else
                         fut.onDone();
                 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 7502541..4a09ade 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -290,6 +290,11 @@ public interface IgniteInternalTx {
     public boolean markFinalizing(FinalizationStatus status);
 
     /**
+     * @return Finalization status.
+     */
+    public @Nullable FinalizationStatus finalizationStatus();
+
+    /**
      * @param cacheId Cache id.
      * @param part Invalid partition.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 4440174..fff1214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -631,7 +631,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     /**
      * @return Finalization status.
      */
-    protected FinalizationStatus finalizationStatus() {
+    @Override @Nullable public FinalizationStatus finalizationStatus() {
         return finalizing;
     }
 
@@ -2321,6 +2321,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
+        @Nullable @Override public FinalizationStatus finalizationStatus() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public void addInvalidPartition(int cacheId, int part) {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 5eee8ed..57c210b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1066,14 +1066,14 @@ public class IgniteTxHandler {
             tx.nearFinishMiniId(req.miniId());
             tx.storeEnabled(req.storeEnabled());
 
-            if (req.commit()) {
-                if (!tx.markFinalizing(USER_FINISH)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+            if (!tx.markFinalizing(USER_FINISH)) {
+                if (log.isDebugEnabled())
+                    log.debug("Will not finish transaction (it is handled by another thread) [commit=" + req.commit() + ", tx=" + tx + ']');
 
-                    return null;
-                }
+                return null;
+            }
 
+            if (req.commit()) {
                 IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitDhtLocalAsync();
 
                 // Only for error logging.
@@ -2273,6 +2273,10 @@ public class IgniteTxHandler {
     }
 
     /**
+     * Applies partition counter updates for transactions.
+     * <p>
+     * Called after entries are written to WAL on commit or during rollback to close gaps in update counter sequence.
+     *
      * @param counters Counters.
      */
     public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters)
@@ -2282,13 +2286,23 @@ public class IgniteTxHandler {
 
     /**
      * Applies partition counter updates for transactions.
+     * <p>
+     * Called after entries are written to WAL on commit or during rollback to close gaps in update counter sequence.
+     * <p>
+     * On rollback counters should be applied on the primary only after backup nodes, otherwise if the primary fail
+     * before sending rollback requests to backups remote transactions can be committed by recovery protocol and
+     * partition consistency will not be restored when primary returns to the grid because RollbackRecord was written
+     * (actual for persistent mode only).
      *
      * @param counters Counter values to be updated.
-     * @param rollback {@code True} if applied from rollbacks.
+     * @param rollback {@code True} if applied during rollbacks.
+     * @param rollbackOnPrimary {@code True} if rollback happens on primary node. Passed to CQ engine.
      */
-    public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters,
+    public void applyPartitionsUpdatesCounters(
+        Iterable<PartitionUpdateCountersMessage> counters,
         boolean rollback,
-        boolean rollbackOnPrimary) throws IgniteCheckedException {
+        boolean rollbackOnPrimary
+    ) throws IgniteCheckedException {
         if (counters == null)
             return;
 
@@ -2349,7 +2363,7 @@ public class IgniteTxHandler {
                         invalid = true;
                     }
 
-                    if (invalid && log.isDebugEnabled()) {
+                    if (log.isDebugEnabled() && invalid) {
                         log.debug("Received partition update counters message for invalid partition, ignoring: " +
                             "[cacheId=" + counter.cacheId() + ", part=" + counter.partition(i) + ']');
                     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a146ad5..18ccdea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1106,13 +1106,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         }
 
         if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
-            if (!txState.mvccEnabled()) {
-                TxCounters txCounters = txCounters(false);
-
-                if (txCounters != null)
-                    cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCounters.updateCounters(), true, true);
-            }
-
             cctx.tm().rollbackTx(this, clearThreadMap, skipCompletedVersions());
 
             cctx.mvccCaching().onTxFinished(this, false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a6fd06c..96381a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -21,7 +21,6 @@ import java.io.Externalizable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -2187,6 +2186,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 TransactionState state = tx.state();
 
                 if (state == PREPARED || state == COMMITTING || state == COMMITTED) {
+                    if (state == PREPARED)
+                        tx.markFinalizing(RECOVERY_FINISH); // Prevents concurrent rollback.
+
                     if (--txNum == 0) {
                         if (fut != null)
                             fut.onDone(true);
@@ -2195,7 +2197,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     }
                 }
                 else {
-                    if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) {
+                    if (tx.setRollbackOnly() || tx.state() == UNKNOWN) {
                         tx.rollbackAsync();
 
                         if (log.isDebugEnabled())
@@ -2232,7 +2234,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 }
 
                 if (processedVers == null)
-                    processedVers = new HashSet<>(txNum, 1.0f);
+                    processedVers = U.newHashSet(txNum);
 
                 processedVers.add(tx.xidVersion());
             }
@@ -2281,12 +2283,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (log.isInfoEnabled())
             log.info("Finishing prepared transaction [commit=" + commit + ", tx=" + tx + ']');
 
-        if (!tx.markFinalizing(RECOVERY_FINISH)) {
-            if (log.isInfoEnabled())
-                log.info("Will not try to commit prepared transaction (could not mark finalized): " + tx);
-
-            return;
-        }
+        // Transactions participating in recovery can be finished only by recovery consensus.
+        assert tx.finalizationStatus() == RECOVERY_FINISH : tx;
 
         if (tx instanceof IgniteTxRemoteEx) {
             IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
@@ -2333,6 +2331,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         assert !F.isEmpty(tx.transactionNodes()) : tx;
         assert tx.nearXidVersion() != null : tx;
 
+        // Transaction will be completed by finish message.
+        if (!tx.markFinalizing(RECOVERY_FINISH))
+            return;
+
         GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
             cctx,
             tx,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index bf5220a..8ed4d7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -296,6 +296,16 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /**
+     * @param idx Stripe index.
+     * @return Queue size of specific stripe.
+     */
+    public int queueStripeSize(int idx) {
+        A.ensure(idx >= 0, "Stripe index should be non-negative: " + idx);
+
+        return stripes[idx % stripes.length].queueSize();
+    }
+
+    /**
      * @return Completed tasks count.
      */
     public long completedTasks() {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
new file mode 100644
index 0000000..9886bad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionState;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/**
+ */
+public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest {
+    /** Backups. */
+    private int backups;
+
+    /** Persistence. */
+    private boolean persistence;
+
+    /** Sync mode. */
+    private CacheWriteSynchronizationMode syncMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setConsistentId(name);
+
+        if (persistence) {
+            cfg.setDataStorageConfiguration(
+                new DataStorageConfiguration().
+                    setWalSegmentSize(4 * 1024 * 1024).
+                    setWalHistorySize(1000).
+                    setCheckpointFrequency(Integer.MAX_VALUE).
+                    setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(50 * 1024 * 1024)));
+        }
+
+        cfg.setActiveOnStart(false);
+        cfg.setClientMode("client".equals(name));
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+            setCacheMode(PARTITIONED).
+            setBackups(backups).
+            setAtomicityMode(TRANSACTIONAL).
+            setWriteSynchronizationMode(syncMode));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * The test enforces specific order in messages processing during concurrent tx rollback and tx recovery due to
+     * node left.
+     * <p>
+     * Expected result: both DHT transactions produces same COMMITTED state on tx finish.
+     * */
+    @Test
+    public void testRecoveryNotBreakingTxAtomicityOnNearFail() throws Exception {
+        backups = 1;
+        persistence = false;
+
+        final IgniteEx node0 = startGrids(3);
+        node0.cluster().active(true);
+
+        final Ignite client = startGrid("client");
+
+        final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        final List<Integer> g0Keys = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 100);
+        final List<Integer> g1Keys = primaryKeys(grid(1).cache(DEFAULT_CACHE_NAME), 100);
+
+        final List<Integer> g2BackupKeys = backupKeys(grid(2).cache(DEFAULT_CACHE_NAME), 100, 0);
+
+        Integer k1 = null;
+        Integer k2 = null;
+
+        for (Integer key : g2BackupKeys) {
+            if (g0Keys.contains(key))
+                k1 = key;
+            else if (g1Keys.contains(key))
+                k2 = key;
+
+            if (k1 != null && k2 != null)
+                break;
+        }
+
+        assertNotNull(k1);
+        assertNotNull(k2);
+
+        List<IgniteInternalTx> txs0 = null;
+        List<IgniteInternalTx> txs1 = null;
+
+        CountDownLatch stripeBlockLatch = new CountDownLatch(1);
+
+        int[] stripeHolder = new int[1];
+
+        try(final Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+            cache.put(k1, Boolean.TRUE);
+            cache.put(k2, Boolean.TRUE);
+
+            TransactionProxyImpl p  = (TransactionProxyImpl)tx;
+            p.tx().prepare(true);
+
+            txs0 = txs(grid(0));
+            txs1 = txs(grid(1));
+            List<IgniteInternalTx> txs2 = txs(grid(2));
+
+            assertTrue(txs0.size() == 1);
+            assertTrue(txs1.size() == 1);
+            assertTrue(txs2.size() == 2);
+
+            // Prevent recovery request for grid1 tx branch to go to grid0.
+            TestRecordingCommunicationSpi.spi(grid(1)).blockMessages(GridCacheTxRecoveryRequest.class, grid(0).name());
+            // Prevent finish(false) request processing on node0.
+            TestRecordingCommunicationSpi.spi(client).blockMessages(GridNearTxFinishRequest.class, grid(0).name());
+
+            int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+
+            stripeHolder[0] = stripe;
+
+            // Blocks stripe processing for rollback request on node1.
+            grid(1).context().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(stripeBlockLatch));
+            // Dummy task to ensure msg is processed.
+            grid(1).context().getStripedExecutorService().execute(stripe, () -> {});
+
+            runAsync(() -> {
+                TestRecordingCommunicationSpi.spi(client).waitForBlocked();
+
+                client.close();
+
+                return null;
+            });
+
+            tx.rollback();
+
+            fail();
+        }
+        catch (Exception ignored) {
+            // Expected.
+        }
+
+        // Wait until tx0 is committed by recovery on node0.
+        assertNotNull(txs0);
+        try {
+            txs0.get(0).finishFuture().get(3_000);
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            // If timeout happens recovery message from g0 to g1 is mapped to the same stripe as near finish request.
+            // We will complete latch to allow sequential processing.
+            stripeBlockLatch.countDown();
+
+            // Wait until sequential processing is finished.
+            assertTrue("sequential processing", GridTestUtils.waitForCondition(() ->
+                grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
+
+            // Unblock recovery message from g1 to g0 because tx is in RECOVERY_FINISH state and waits for recovery end.
+            TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+
+            txs0.get(0).finishFuture().get();
+            txs1.get(0).finishFuture().get();
+
+            final TransactionState s1 = txs0.get(0).state();
+            final TransactionState s2 = txs1.get(0).state();
+
+            assertEquals(s1, s2);
+
+            return;
+        }
+
+        // Release rollback request processing, triggering an attempt to rollback the transaction during recovery.
+        stripeBlockLatch.countDown();
+
+        // Wait until finish message is processed.
+        assertTrue("concurrent processing", GridTestUtils.waitForCondition(() ->
+            grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
+
+        // Proceed with recovery on grid1 -> grid0. Tx0 is committed so tx1 also should be committed.
+        TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+
+        assertNotNull(txs1);
+        txs1.get(0).finishFuture().get();
+
+        final TransactionState s1 = txs0.get(0).state();
+        final TransactionState s2 = txs1.get(0).state();
+
+        assertEquals(s1, s2);
+    }
+
+    /** */
+    @Test
+    public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_FULL_SYNC() throws Exception {
+        doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(FULL_SYNC);
+    }
+
+    /** */
+    @Test
+    public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_PRIMARY_SYNC() throws Exception {
+        doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(PRIMARY_SYNC);
+    }
+
+    /** */
+    @Test
+    public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_FULL_ASYNC() throws Exception {
+        doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(FULL_ASYNC);
+    }
+
+    /**
+     * Stop near and primary node after primary tx is rolled back with enabled persistence.
+     * <p>
+     * Expected result: after restarting a primary node all partitions are consistent.
+     */
+    private void doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(CacheWriteSynchronizationMode syncMode)
+        throws Exception {
+        backups = 2;
+        persistence = true;
+        this.syncMode = syncMode;
+
+        final IgniteEx node0 = startGrids(3);
+        node0.cluster().active(true);
+
+        final Ignite client = startGrid("client");
+
+        final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        final Integer pk = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+        IgniteInternalFuture<Void> fut = null;
+
+        List<IgniteInternalTx> tx0 = null;
+        List<IgniteInternalTx> tx2 = null;
+
+        try(final Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+            cache.put(pk, Boolean.TRUE);
+
+            TransactionProxyImpl p = (TransactionProxyImpl)tx;
+            p.tx().prepare(true);
+
+            tx0 = txs(grid(0));
+            tx2 = txs(grid(2));
+
+            TestRecordingCommunicationSpi.spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridDhtTxFinishRequest);
+
+            fut = runAsync(() -> {
+                TestRecordingCommunicationSpi.spi(grid(1)).waitForBlocked(2);
+
+                client.close();
+                grid(1).close();
+
+                return null;
+            });
+
+            tx.rollback();
+        }
+        catch (Exception e) {
+            // No-op.
+        }
+
+        fut.get();
+
+        final IgniteInternalTx tx_0 = tx0.get(0);
+        tx_0.finishFuture().get();
+
+        final IgniteInternalTx tx_2 = tx2.get(0);
+        tx_2.finishFuture().get();
+
+        assertPartitionsSame(idleVerify(grid(0), DEFAULT_CACHE_NAME));
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        assertPartitionsSame(idleVerify(grid(0), DEFAULT_CACHE_NAME));
+    }
+
+    /**
+     * @param g Grid.
+     */
+    private List<IgniteInternalTx> txs(IgniteEx g) {
+        return new ArrayList<>(g.context().cache().context().tm().activeTransactions());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index e135c64..12d9162 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.CheckpointBuff
 import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithPrimaryIndexCorruptionTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheMapOnInvalidTopologyTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheRemoteMultiplePartitionReservationTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxRecoveryWithConcurrentRollbackTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncWithPersistenceTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -123,6 +124,8 @@ public class IgniteCacheTestSuite7 {
 
         GridTestUtils.addTestIfNeeded(suite, SafeLogTxFinishErrorTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, TxRecoveryWithConcurrentRollbackTest.class, ignoredTests);
+
         return suite;
     }
 }