You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/04/29 07:03:06 UTC

[ignite] branch master updated: IGNITE-6324 Filtering uncommited tx WAL records to prevent restoring tx partially - Fixes #8987.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d0da231  IGNITE-6324 Filtering uncommited tx WAL records to prevent restoring tx partially - Fixes #8987.
d0da231 is described below

commit d0da231c7e8891d9fd45be3a245612e41b464e60
Author: zstan <st...@gmail.com>
AuthorDate: Thu Apr 29 09:56:07 2021 +0300

    IGNITE-6324 Filtering uncommited tx WAL records to prevent restoring tx partially - Fixes #8987.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../internal/pagemem/wal/record/DataEntry.java     |   1 +
 .../internal/pagemem/wal/record/WALRecord.java     |   3 +-
 .../GridCacheDatabaseSharedManager.java            |  33 +-
 .../persistence/checkpoint/CheckpointManager.java  |   4 +-
 .../cache/persistence/checkpoint/Checkpointer.java |  13 +-
 .../cache/transactions/IgniteTxAdapter.java        |  17 +-
 .../cache/transactions/IgniteTxLocalAdapter.java   |  11 +-
 .../cache/transactions/IgniteTxManager.java        |  66 +++-
 .../persistence/db/IgniteLogicalRecoveryTest.java  |   2 +-
 .../db/IgniteLogicalRecoveryWithParamsTest.java    | 368 +++++++++++++++++++++
 .../IgnitePdsWithIndexingCoreTestSuite.java        |   2 +
 11 files changed, 487 insertions(+), 33 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index dd05726..73de55b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -44,6 +44,7 @@ public class DataEntry {
     protected GridCacheOperation op;
 
     /** Near transaction version. */
+    @GridToStringInclude
     protected GridCacheVersion nearXidVer;
 
     /** Write version. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index f07b71a..8f49e5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -342,7 +342,8 @@ public abstract class WALRecord {
         PHYSICAL,
         /**
          * Logical records are needed to replay logical updates since last checkpoint.
-         * {@link GridCacheDatabaseSharedManager#applyLogicalUpdates(CheckpointStatus, org.apache.ignite.lang.IgnitePredicate, org.apache.ignite.lang.IgniteBiPredicate, boolean)}
+         * {@link GridCacheDatabaseSharedManager#applyLogicalUpdates(CheckpointStatus, org.apache.ignite.lang.IgnitePredicate,
+         * org.apache.ignite.lang.IgniteBiPredicate, boolean)}
          */
         LOGICAL,
         /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index e6ca08c..42ff08d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
@@ -137,6 +138,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
@@ -183,6 +185,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD_V2;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.TX_RECORD;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
 import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
@@ -839,7 +842,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 metaStorage = createMetastorage(true);
 
-                applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), false);
+                applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), true);
 
                 fillWalDisabledGroups();
 
@@ -1945,9 +1948,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 status,
                 groupsWithEnabledWal(),
                 logicalRecords(),
-                true
+                false
             );
 
+            cctx.tm().clearUncommitedStates();
+
             if (recoveryVerboseLogging && log.isInfoEnabled()) {
                 log.info("Partition states information after LOGICAL RECOVERY phase:");
 
@@ -2598,6 +2603,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /**
      * @param status Last registered checkpoint status.
+     * @param restoreMeta Metastore restore phase if {@code true}.
      * @throws IgniteCheckedException If failed to apply updates.
      * @throws StorageException If IO exception occurred while reading write-ahead log.
      */
@@ -2605,13 +2611,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         CheckpointStatus status,
         IgnitePredicate<Integer> cacheGroupsPredicate,
         IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate,
-        boolean skipFieldLookup
+        boolean restoreMeta
     ) throws IgniteCheckedException {
         if (log.isInfoEnabled())
-            log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+            log.info("Applying lost " + (restoreMeta ? "metastore" : "cache") + " updates since last checkpoint record [lastMarked="
                 + status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
 
-        if (skipFieldLookup)
+        if (!restoreMeta)
             cctx.kernalContext().query().skipFieldLookup(true);
 
         long start = U.currentTimeMillis();
@@ -2633,6 +2639,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         RestoreLogicalState restoreLogicalState =
             new RestoreLogicalState(status, it, lastArchivedSegment, cacheGroupsPredicate, partitionRecoveryStates);
 
+        final IgniteTxManager txManager = cctx.tm();
+
         try {
             while (it.hasNextX()) {
                 WALRecord rec = restoreLogicalState.next();
@@ -2641,6 +2649,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     break;
 
                 switch (rec.type()) {
+                    case TX_RECORD:
+                        if (restoreMeta) { // Also restore tx states.
+                            TxRecord txRec = (TxRecord)rec;
+
+                            txManager.collectTxStates(txRec);
+                        }
+
+                        break;
                     case CHECKPOINT_RECORD: // Calculate initial partition states
                         CheckpointRecord cpRec = (CheckpointRecord)rec;
 
@@ -2682,6 +2698,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         DataRecord dataRec = (DataRecord)rec;
 
                         for (DataEntry dataEntry : dataRec.writeEntries()) {
+                            if (!restoreMeta && txManager.uncommitedTx(dataEntry))
+                                continue;
+
                             int cacheId = dataEntry.cacheId();
 
                             DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheId);
@@ -2777,7 +2796,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         finally {
             it.close();
 
-            if (skipFieldLookup)
+            if (!restoreMeta)
                 cctx.kernalContext().query().skipFieldLookup(false);
         }
 
@@ -3465,7 +3484,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @return WAL records predicate that passes only Metastorage and encryption data records.
      */
     private IgniteBiPredicate<WALRecord.RecordType, WALPointer> onlyMetastorageAndEncryptionRecords() {
-        return (type, ptr) -> type == METASTORE_DATA_RECORD ||
+        return (type, ptr) -> type == METASTORE_DATA_RECORD || type == TX_RECORD ||
             type == MASTER_KEY_CHANGE_RECORD || type == MASTER_KEY_CHANGE_RECORD_V2;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 73b05d6..3d8c287 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -327,9 +327,7 @@ public class CheckpointManager {
         checkpointMarkersStorage.cleanupCheckpointDirectory();
     }
 
-    /**
-     *
-     */
+    /** Current checkpointer implementation. */
     public Checkpointer getCheckpointer() {
         return checkpointer;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index 1424a44..a7ac809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -120,7 +120,7 @@ public class Checkpointer extends GridWorker {
     private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
 
     /** Avoid the start checkpoint if checkpointer was canceled. */
-    private final boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
+    private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
 
     /** Long JVM pause threshold. */
     private final int longJvmPauseThreshold =
@@ -244,7 +244,7 @@ public class Checkpointer extends GridWorker {
                 checkpointWritePageThreads,
                 checkpointWritePageThreads,
                 30_000,
-                new LinkedBlockingQueue<Runnable>()
+                new LinkedBlockingQueue<>()
             );
 
         return null;
@@ -1007,4 +1007,13 @@ public class Checkpointer extends GridWorker {
     private boolean isShutdownNow() {
         return shutdownNow;
     }
+
+    /**
+     * Skip checkpoint on node stop.
+     *
+     * @param skip If {@code true} skips checkpoint on node stop.
+     */
+    public void skipCheckpointOnNodeStop(boolean skip) {
+        skipCheckpointOnNodeStop = skip;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 9daddb5..04db6cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -63,7 +63,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -1228,7 +1227,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                     seal();
 
                 if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) {
-                    cctx.tm().setMvccState(this, toMvccState(state));
+                    cctx.tm().setMvccState(this, state);
 
                     ptr = cctx.tm().logTxRecord(this);
                 }
@@ -1260,20 +1259,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** */
-    private byte toMvccState(TransactionState state) {
-        switch (state) {
-            case PREPARED:
-                return TxState.PREPARED;
-            case COMMITTED:
-                return TxState.COMMITTED;
-            case ROLLED_BACK:
-                return TxState.ABORTED;
-            default:
-                throw new IllegalStateException("Unexpected state: " + state);
-        }
-    }
-
-    /** */
     private void recordStateChangedEvent(TransactionState state) {
         if (!near() || !local()) // Covers only GridNearTxLocal's state changes.
             return;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7b6cd7f..50d07d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpda
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -937,7 +938,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) +
                         ", err=" + ex + ']');
 
-                    return;
+                    boolean persistenceEnabled = CU.isPersistenceEnabled(cctx.kernalContext().config());
+
+                    if (persistenceEnabled) {
+                        GridCacheDatabaseSharedManager dbManager = (GridCacheDatabaseSharedManager)cctx.database();
+
+                        dbManager.getCheckpointer().skipCheckpointOnNodeStop(true);
+                    }
+
+                    throw ex;
                 }
 
                 err = heuristicException(ex);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c3c07b5..c153caa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
@@ -53,6 +55,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.managers.systemview.walker.TransactionViewWalker;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -88,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOpti
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -198,6 +202,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private static final int SLOW_TX_WARN_TIMEOUT =
         Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, DFLT_SLOW_TX_WARN_TIMEOUT);
 
+    /** Returns {@code true} if transaction has completed states. */
+    public static final Predicate<TxRecord> COMPLETED_TX_STATES = new Predicate<TxRecord>() {
+        @Override public boolean test(TxRecord txRec) {
+            return txRec.state() == COMMITTED || txRec.state() == ROLLED_BACK;
+        }
+    };
+
+    /** Returns {@code true} if transaction has prepared states. */
+    public static final Predicate<TxRecord> PREPARED_TX_STATES = new Predicate<TxRecord>() {
+        @Override public boolean test(TxRecord txRec) {
+            return txRec.state() == PREPARED || txRec.state() == PREPARING;
+        }
+    };
+
+    /** Uncommited tx states. */
+    private Set<GridCacheVersion> uncommitedTx = new HashSet<>();
+
     /** One phase commit deferred ack request timeout. */
     public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
         Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT,
@@ -2842,20 +2863,36 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param tx Transaction.
      * @param state New state.
      */
-    public void setMvccState(IgniteInternalTx tx, byte state) {
+    public void setMvccState(IgniteInternalTx tx, TransactionState state) {
         if (cctx.kernalContext().clientNode() || tx.mvccSnapshot() == null || tx.near() && !tx.local())
             return;
 
+        byte state0 = toMvccState(state);
+
         cctx.database().checkpointReadLock();
 
         try {
-            cctx.coordinators().updateState(tx.mvccSnapshot(), state, tx.local());
+            cctx.coordinators().updateState(tx.mvccSnapshot(), state0, tx.local());
         }
         finally {
             cctx.database().checkpointReadUnlock();
         }
     }
 
+    /** */
+    private byte toMvccState(TransactionState state) {
+        switch (state) {
+            case PREPARED:
+                return TxState.PREPARED;
+            case COMMITTED:
+                return TxState.COMMITTED;
+            case ROLLED_BACK:
+                return TxState.ABORTED;
+            default:
+                throw new IllegalStateException("Unexpected state: " + state);
+        }
+    }
+
     /**
      *  Finishes MVCC transaction.
      *  @param tx Transaction.
@@ -3764,4 +3801,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             return 0;
         }
     }
+
+    /**
+     * Collects tx states {@link TransactionState} for further correct restoring.
+     *
+     * @param txRec tx Record.
+     */
+    public void collectTxStates(final TxRecord txRec) {
+        if (COMPLETED_TX_STATES.test(txRec))
+            uncommitedTx.remove(txRec.nearXidVersion());
+        else if (PREPARED_TX_STATES.test(txRec))
+            uncommitedTx.add(txRec.nearXidVersion());
+    }
+
+    /**
+     * @param dataEntry Processing entry.
+     * @return {@code true} If entry contains not completed tx version.
+     */
+    public boolean uncommitedTx(final DataEntry dataEntry) {
+        return uncommitedTx.contains(dataEntry.nearXidVersion());
+    }
+
+    /** Clears tx states collections. */
+    public void clearUncommitedStates() {
+        uncommitedTx = Collections.emptySet();
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
index 8cc7c3f..3f8f853 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
@@ -480,7 +480,7 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
                 .collect(Collectors.toList());
 
             Assert.assertTrue("There was unexpected rebalance for some groups" +
-                    " [node=" + node.name() + ", groups=" + rebalancedGroups + ']', rebalancedGroups.isEmpty());
+                " [node=" + node.name() + ", groups=" + rebalancedGroups + ']', rebalancedGroups.isEmpty());
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java
new file mode 100644
index 0000000..aac00dc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP;
+import static org.apache.ignite.testframework.GridTestUtils.DFLT_BUSYWAIT_SLEEP_INTERVAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/**
+ * A set of tests that check correctness of logical recovery performed during node start.
+ */
+@RunWith(Parameterized.class)
+public class IgniteLogicalRecoveryWithParamsTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+            .setWalMode(WALMode.LOG_ONLY)
+            .setCheckpointFrequency(1024 * 1024 * 1024) // Disable automatic checkpoints.
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration()
+                    .setName("dflt")
+                    .setInitialSize(256 * 1024 * 1024)
+                    .setMaxSize(256 * 1024 * 1024)
+                    .setPersistenceEnabled(true)
+            );
+
+        cfg.setDataStorageConfiguration(dsCfg);
+
+        TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(spi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** Parametrized run param : server nodes. */
+    @Parameterized.Parameter(0)
+    public Integer numSrvNodes;
+
+    /** Parametrized run param : single node tx. */
+    @Parameterized.Parameter(1)
+    public Boolean singleNodeTx;
+
+    /** Parametrized run param : backups count. */
+    @Parameterized.Parameter(2)
+    public Integer backups;
+
+    /** Test run configurations: Cache mode, atomicity type, is near. */
+    @Parameterized.Parameters(name = "nodesCnt={0}, singleNodeTx={1}, backups={2}")
+    public static Collection<Object[]> runConfig() {
+        return Arrays.asList(new Object[][] {
+            {1, true, 0},
+            {1, true, 1},
+            {1, false, 0},
+            {1, false, 1},
+            {2, true, 0},
+            {2, true, 1},
+            {2, false, 0},
+            {2, false, 1},
+        });
+    }
+
+    /**Tests partially commited transactions with further recovery. */
+    @Test
+    @WithSystemProperty(key = IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, value = "true")
+    @WithSystemProperty(key = IGNITE_WAL_LOG_TX_RECORDS, value = "true")
+    public void testPartiallyCommitedTx_TwoNode_WithCpOnNodeStop_MultiNodeTx_OneBackup() throws Exception {
+        testPartiallyCommitedTx();
+    }
+
+    /**Tests partially commited transactions with further recovery. */
+    @Test
+    @WithSystemProperty(key = IGNITE_WAL_LOG_TX_RECORDS, value = "true")
+    public void testPartiallyCommitedTx_TwoNode_WithoutCpOnNodeStop_SingleNodeTx() throws Exception {
+        testPartiallyCommitedTx();
+    }
+
+    /**
+     * Tests concurrent tx with node stop and further recovery.
+     *
+     */
+    private void testPartiallyCommitedTx() throws Exception {
+        final String cacheName = "recovery";
+
+        int itmsCount = 30_000;
+
+        AtomicBoolean failFileIO = new AtomicBoolean();
+
+        List<Integer> keys;
+
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<Integer, Integer>(cacheName)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setBackups(backups)
+            .setAffinity(new RendezvousAffinityFunction(false, 32));
+
+        try {
+            final IgniteEx srv = (IgniteEx)startGridsMultiThreaded(numSrvNodes);
+
+            G.allGrids().forEach(n -> setWalIOFactory(n, failFileIO));
+
+            IgniteEx clnt = startClientGrid("client");
+
+            TestRecordingCommunicationSpi nearComm = TestRecordingCommunicationSpi.spi(clnt);
+
+            srv.cluster().state(ClusterState.ACTIVE);
+
+            final IgniteCache cache = clnt.getOrCreateCache(cfg);
+
+            final CountDownLatch commitStart = new CountDownLatch(1);
+
+            forceCheckpoint();
+
+            nearComm.blockMessages((node, msg) -> msg instanceof GridNearTxPrepareRequest);
+
+            if (singleNodeTx)
+                keys = primaryKeys(srv.cache(cacheName), itmsCount, 0);
+            else
+                keys = IntStream.range(0, itmsCount).boxed().collect(Collectors.toList());
+
+            Thread t = new Thread(() -> {
+                try (Transaction tx = clnt.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+                    keys.forEach(k -> cache.put(k, k));
+
+                    commitStart.countDown();
+
+                    tx.commit();
+                }
+            });
+
+            t.start();
+
+            commitStart.await();
+
+            nearComm.waitForBlocked();
+
+            nearComm.stopBlock();
+
+            assertTrue(waitForWalUpdates(G.allGrids().stream().filter(g -> !g.configuration().isClientMode())
+                .collect(Collectors.toList())));
+        }
+        finally {
+            failFileIO.set(true);
+
+            stopAllGrids(true);
+
+            assertTrue(G.allGrids().isEmpty());
+        }
+
+        final IgniteEx srv = (IgniteEx)startGridsMultiThreaded(numSrvNodes);
+
+        srv.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Integer, Integer> cache = srv.cache(cacheName);
+
+        int cSize = cache.size();
+
+        boolean pr = cache.get(keys.get(0)) == null;
+
+        for (int i : keys) {
+            Object res = cache.get(i);
+
+            if (pr != (res == null))
+                assertEquals("ethalon=" + pr + ", current=" + res + ", key=" + i, pr, res == null);
+        }
+
+        assert (cSize == itmsCount || cSize == 0) : "unexpected cache size: " + cSize;
+    }
+
+    /** */
+    private boolean waitForWalUpdates(Collection<Ignite> grids) throws IgniteInterruptedCheckedException {
+        long start = U.currentTimeMillis();
+
+        int[] offsets = new int[grids.size()];
+
+        int gCnt = 0;
+
+        for (Ignite grid : grids)
+            offsets[gCnt++] = getWalPos(grid);
+
+        while (true) {
+            gCnt = 0;
+
+            for (Ignite grid : grids) {
+                if (getWalPos(grid) - offsets[gCnt++] > 100)
+                    return true;
+            }
+
+            U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL / 2);
+
+            if (U.currentTimeMillis() - start > 20_000)
+                return false;
+        }
+    }
+
+    /**
+     * Sets file IO factory.
+     *
+     * @param grid Ignite instance.
+     * @param canFail If {@code true} throws exception on write.
+     *
+     */
+    private void setWalIOFactory(Ignite grid, AtomicBoolean canFail) {
+        IgniteEx grid0 = (IgniteEx)grid;
+
+        FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)grid0.context().cache().context().wal();
+
+        walMgr.setFileIOFactory(new FailingFileIOFactory(canFail));
+    }
+
+    /**
+     * @param grid Ignite instance.
+     * @return Returns current wal position.
+     */
+    private int getWalPos(Ignite grid) {
+        IgniteEx grid0 = (IgniteEx)grid;
+
+        FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)grid0.context().cache().context().wal();
+
+        FileWriteHandle fhAfter = U.field(walMgr, "currHnd");
+
+        try {
+            fhAfter.fsync(null);
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, e);
+        }
+
+        return fhAfter.position().fileOffset();
+    }
+
+    /**
+     * Create File I/O which fails after flag is touched.
+     */
+    private static class FailingFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private AtomicBoolean fail;
+
+        /** */
+        private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+        /** */
+        FailingFileIOFactory(AtomicBoolean fail) {
+            this.fail = fail;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            final FileIO delegate = delegateFactory.create(file, modes);
+
+            return new FileIODecorator(delegate) {
+                @Override public int write(ByteBuffer srcBuf) throws IOException {
+                    if (fail != null && fail.get())
+                        throw new IOException("No space left on device");
+
+                    return super.write(srcBuf);
+                }
+
+                /** {@inheritDoc} */
+                @Override public int write(ByteBuffer srcBuf, long position) throws IOException {
+                    if (fail != null && fail.get())
+                        throw new IOException("No space left on device");
+
+                    return delegate.write(srcBuf, position);
+                }
+
+                /** {@inheritDoc} */
+                @Override public int write(byte[] buf, int off, int len) throws IOException {
+                    if (fail != null && fail.get())
+                        throw new IOException("No space left on device");
+
+                    return delegate.write(buf, off, len);
+                }
+
+                /** {@inheritDoc} */
+                @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+                    return delegate.map(sizeBytes);
+                }
+            };
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 4f6b671..26e1df4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentS
 import org.apache.ignite.internal.processors.cache.persistence.PersistenceDirectoryWarningLoggingTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteLogicalRecoveryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgniteLogicalRecoveryWithParamsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteSequentialNodeCrashRecoveryTest;
@@ -93,6 +94,7 @@ import org.junit.runners.Suite;
     IgnitePdsCorruptedIndexTest.class,
 
     IgniteLogicalRecoveryTest.class,
+    IgniteLogicalRecoveryWithParamsTest.class,
 
     IgniteSequentialNodeCrashRecoveryTest.class,