You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/04/26 10:18:48 UTC

[ignite] 11/17: GG-17387 Transactional Datacenter Replication implemented

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-17462
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit f188175382dab58e0c04321659e9ff1d03019a64
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed Apr 24 20:44:55 2019 +0300

    GG-17387 Transactional Datacenter Replication implemented
    
    Contributors list:
    
    Aleksey Plekhanov <Pl...@gmail.com>
    Andey Gura <ag...@apache.org>
    Andrey Kuznetsov <st...@gmail.com>
    Dmitriy Sorokin <d....@gmail.com>
    Ivan Daschinskiy <iv...@gmail.com>
    Ivan Rakov <iv...@gmail.com>
    Sergey Antonov <an...@gmail.com>
    Sergey Kosarev <sk...@gridgain.com>
    Vyacheslav Koptilin <sl...@gmail.com>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   2 +
 .../org/apache/ignite/internal/GridComponent.java  |   5 +-
 .../apache/ignite/internal/GridKernalContext.java  |   8 +
 .../ignite/internal/GridKernalContextImpl.java     |  12 +
 .../java/org/apache/ignite/internal/GridTopic.java |   5 +-
 .../org/apache/ignite/internal/IgniteKernal.java   |  21 +-
 .../ignite/internal/cluster/IgniteClusterImpl.java |  15 +-
 .../communication/GridIoMessageFactory.java        |   1 +
 .../managers/discovery/GridDiscoveryManager.java   |  20 +
 .../pagemem/wal/record/ConsistentCutRecord.java    |  26 +
 .../internal/pagemem/wal/record/DataEntry.java     |   5 +-
 .../internal/pagemem/wal/record/WALRecord.java     |   5 +-
 .../processors/cache/CacheGroupContext.java        |   1 +
 .../processors/cache/GridCacheMapEntry.java        |   2 +
 .../cache/GridCachePartitionExchangeManager.java   | 118 ++-
 .../internal/processors/cache/WalStateManager.java |   2 +-
 .../cache/binary/BinaryMetadataHolder.java         |   4 +-
 .../GridDistributedTxRemoteAdapter.java            |  23 +
 .../dht/GridDhtTopologyFutureAdapter.java          |  10 +-
 .../dht/IgniteClusterReadOnlyException.java        |  41 +
 .../distributed/dht/preloader/ExchangeType.java    |  30 +
 .../dht/preloader/GridDhtPartitionDemander.java    |   6 +-
 .../dht/preloader/GridDhtPartitionSupplier.java    |  45 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java |  74 +-
 .../dht/preloader/GridDhtPreloader.java            |   6 +
 .../dht/preloader/PartitionsExchangeAware.java     |  37 +
 .../dht/topology/GridClientPartitionTopology.java  |   7 +-
 .../dht/topology/GridDhtPartitionTopology.java     |   8 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |  29 +-
 .../near/GridNearTxPrepareFutureAdapter.java       |   1 +
 .../GridCacheDatabaseSharedManager.java            | 348 ++++++---
 .../cache/persistence/metastorage/MetaStorage.java |  12 +-
 .../cache/persistence/tree/BPlusTree.java          |  30 +-
 .../wal/AbstractWalRecordsIterator.java            |  11 +-
 .../persistence/wal/FileWriteAheadLogManager.java  |   4 +-
 .../cache/persistence/wal/aware/SegmentAware.java  |   6 +-
 .../wal/aware/SegmentCompressStorage.java          |  39 +-
 .../wal/reader/IgniteWalIteratorFactory.java       |  46 +-
 .../wal/reader/StandaloneGridKernalContext.java    |   6 +
 .../wal/reader/StandaloneWalRecordsIterator.java   |  11 +-
 .../wal/serializer/RecordDataV2Serializer.java     |  10 +
 .../wal/serializer/RecordSerializerFactory.java    |   5 +
 .../serializer/RecordSerializerFactoryImpl.java    |   8 +
 .../cache/transactions/IgniteTxLocalAdapter.java   |   3 +
 .../cache/transactions/IgniteTxManager.java        |  42 +-
 .../LocalPendingTransactionsTracker.java           | 590 ++++++++++++++
 .../cache/transactions/TrackCommittedResult.java   |  57 ++
 .../cache/version/GridCacheVersionManager.java     |  16 +-
 .../cluster/BaselineTopologyHistoryItem.java       |  20 +
 .../cluster/ChangeGlobalStateMessage.java          |   8 +
 .../cluster/GridClusterStateProcessor.java         | 234 ++++--
 .../txdr/NoOpTransactionalDrProcessor.java         |  82 ++
 .../processors/txdr/TransactionalDrProcessor.java  |  77 ++
 .../ignite/internal/util/GridCircularBuffer.java   |  68 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |  37 +
 .../ignite/internal/util/StripedExecutor.java      |  10 +
 .../org/apache/ignite/logger/EchoingLogger.java    | 144 ++++
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |  10 +
 .../cache/ClusterReadOnlyModeAbstractTest.java     |  10 +
 .../processors/cache/ClusterReadOnlyModeTest.java  |  12 +-
 ...ocalWalModeChangeDuringRebalancingSelfTest.java |   2 +
 .../wal/IgniteNodeStoppedDuringDisableWALTest.java |   2 +-
 .../persistence/wal/aware/SegmentAwareTest.java    |  57 +-
 .../reader/StandaloneWalRecordsIteratorTest.java   | 213 ++++-
 .../LocalPendingTransactionsTrackerTest.java       | 853 +++++++++++++++++++++
 .../lang/utils/GridCircularBufferSelfTest.java     |  79 ++
 .../junits/common/GridCommonAbstractTest.java      |  16 +
 .../apache/ignite/util/GridCommandHandlerTest.java |   2 +-
 .../cache/ClusterReadOnlyModeSqlTest.java          |  16 +-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java    |  11 +
 .../spi/discovery/zk/internal/ZkRuntimeState.java  |   2 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java        |   9 +
 .../zk/ZookeeperDiscoverySpiTestUtil.java          |  11 +-
 parent/pom.xml                                     |   2 +-
 74 files changed, 3462 insertions(+), 338 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 886e8c1..85b44f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -265,6 +265,8 @@ public final class IgniteSystemProperties {
      * System property to enable pending transaction tracker.
      * Affects impact of {@link IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING} property:
      * if this property is set, WAL anyway won't be disabled during rebalancing triggered by baseline topology change.
+     * Now it is not nessessary setting this property before grid starting because enabling of pendingTxTracker happens
+     * automatically if needed, and it should be used for test purposes only.
      */
     public static final String IGNITE_PENDING_TX_TRACKER_ENABLED = "IGNITE_PENDING_TX_TRACKER_ENABLED";
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 6f6f8d1..5861665 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -75,7 +75,10 @@ public interface GridComponent {
         SERVICE_PROC,
 
         /** Distributed MetaStorage processor. */
-        META_STORAGE;
+        META_STORAGE,
+
+        /** Transactional data replication processor. */
+        TX_DR_PROC;
 
         /** Cached values array. */
         public static final DiscoveryDataExchangeType[] VALUES = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 6906ea0..c89831e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
 import org.apache.ignite.internal.stat.IoStatisticsManager;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
@@ -429,6 +430,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     public GridSecurityProcessor security();
 
     /**
+     * Gets transactional data replication processor.
+     *
+     * @return Transactional data replication processor.
+     */
+    public TransactionalDrProcessor txDr();
+
+    /**
      * Gets load balancing manager.
      *
      * @return Load balancing manager.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 4c900a2..e5fddfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
 import org.apache.ignite.internal.stat.IoStatisticsManager;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
@@ -321,6 +322,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    private TransactionalDrProcessor txDrProc;
+
+    /** */
+    @GridToStringExclude
     private List<GridComponent> comps = new LinkedList<>();
 
     /** */
@@ -672,6 +677,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
             authProc = (IgniteAuthenticationProcessor)comp;
         else if (comp instanceof CompressionProcessor)
             compressProc = (CompressionProcessor)comp;
+        else if (comp instanceof TransactionalDrProcessor)
+            txDrProc = (TransactionalDrProcessor)comp;
         else if (!(comp instanceof DiscoveryNodeValidationProcessor
             || comp instanceof PlatformPluginProcessor))
             assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -845,6 +852,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public TransactionalDrProcessor txDr() {
+        return txDrProc;
+    }
+
+    /** {@inheritDoc} */
     @Override public GridLoadBalancerManager loadBalancing() {
         return loadMgr;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 4450166..2d25ef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -141,7 +141,10 @@ public enum GridTopic {
     TOPIC_SERVICES,
 
     /** */
-    TOPIC_DEADLOCK_DETECTION;
+    TOPIC_DEADLOCK_DETECTION,
+
+    /** */
+    TOPIC_TXDR;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 638755f..fa242cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -81,7 +81,6 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.MemoryMetrics;
 import org.apache.ignite.PersistenceMetrics;
 import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
@@ -170,6 +169,8 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.NoOpTransactionalDrProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions;
 import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions;
@@ -1031,6 +1032,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 startProcessor(new DistributedMetaStorageImpl(ctx));
                 startProcessor(new DistributedConfigurationProcessor(ctx));
 
+                // Start transactional data replication processor.
+                startProcessor(createComponent(TransactionalDrProcessor.class, ctx));
+
                 // Start plugins.
                 for (PluginProvider provider : ctx.plugins().allProviders()) {
                     ctx.add(new GridPluginComponent(provider));
@@ -3658,18 +3662,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         cluster().active(active);
     }
 
-    /** */
-    private Collection<BaselineNode> baselineNodes() {
-        Collection<ClusterNode> srvNodes = cluster().forServers().nodes();
-
-        ArrayList baselineNodes = new ArrayList(srvNodes.size());
-
-        for (ClusterNode clN : srvNodes)
-            baselineNodes.add(clN);
-
-        return baselineNodes;
-    }
-
     /** {@inheritDoc} */
     @Override public void resetLostPartitions(Collection<String> cacheNames) {
         CU.validateCacheNames(cacheNames);
@@ -4149,6 +4141,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (cls.equals(IGridClusterStateProcessor.class))
             return (T)new GridClusterStateProcessor(ctx);
 
+        if (cls.equals(TransactionalDrProcessor.class))
+            return (T)new NoOpTransactionalDrProcessor(ctx);
+
         Class<T> implCls = null;
 
         try {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index 32691db..69f8f05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -1,12 +1,12 @@
 /*
  * Copyright 2019 GridGain Systems, Inc. and Contributors.
- * 
+ *
  * Licensed under the GridGain Community Edition License (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,8 +43,6 @@ import org.apache.ignite.cluster.ClusterGroupEmptyException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterStartNodeResult;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.ClusterActivationEvent;
-import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -479,7 +477,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
                     target.add(node);
             }
 
-            validateBeforeBaselineChange(target);
+            ctx.state().validateBeforeBaselineChange(target);
 
             ctx.state().changeGlobalState(true, target, true, isBaselineAutoAdjust).get();
         }
@@ -767,8 +765,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
 
             // If there is nothing to start, return finished future with empty result.
             if (nodeCallCnt == 0)
-                return new GridFinishedFuture<Collection<ClusterStartNodeResult>>(
-                    Collections.<ClusterStartNodeResult>emptyList());
+                return new GridFinishedFuture<>(Collections.emptyList());
 
             // Exceeding max line width for readability.
             GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> fut =
@@ -887,4 +884,4 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
     @Override public String toString() {
         return "IgniteCluster [igniteInstanceName=" + ctx.igniteInstanceName() + ']';
     }
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d716e84..31b06fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -1159,6 +1159,7 @@ public class GridIoMessageFactory implements MessageFactory {
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             // [2048..2053] - Snapshots
+            // [4096..4096] - TxDR
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index aa39f8f..a5725a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -24,6 +24,7 @@ import java.lang.management.MemoryUsage;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -2343,6 +2344,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * Sets grid start time.
+     *
+     * @param val New time value.
+     */
+    public void setGridStartTime(long val) {
+        DiscoverySpi spi = getSpi();
+
+        try {
+            spi.getClass().getMethod("setGridStartTime", long.class).invoke(spi, val);
+        }
+        catch (NoSuchMethodException e) {
+            U.error(log, "Discovery SPI has no 'setGridStartTime(long)' method [class=" + spi.getClass() + ']', e);
+        }
+        catch (IllegalAccessException | InvocationTargetException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param warning Warning message to be shown on all nodes.
      * @return Whether node is failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ConsistentCutRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ConsistentCutRecord.java
new file mode 100644
index 0000000..822efd6f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ConsistentCutRecord.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.pagemem.wal.record;
+
+/**
+ * todo GG-13416 doc
+ */
+public class ConsistentCutRecord extends WALRecord {
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.CONSISTENT_CUT;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index add5b70..7c039e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -97,8 +97,9 @@ public class DataEntry {
         this.partId = partId;
         this.partCnt = partCnt;
 
-        // Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
-        assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op;
+        assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE ||
+            op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE :
+            "Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL [op=" + op + ']';
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 4406ffa..4751502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -207,7 +207,10 @@ public abstract class WALRecord {
         MVCC_DATA_RECORD (LOGICAL),
 
         /** Mvcc Tx state change record. */
-        MVCC_TX_RECORD (LOGICAL);
+        MVCC_TX_RECORD (LOGICAL),
+
+        /** Consistent cut record. */
+        CONSISTENT_CUT;
 
         /**
          * When you're adding a new record don't forget to choose record purpose explicitly
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index f1cfc85..582c444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -864,6 +864,7 @@ public class CacheGroupContext {
     }
 
     /**
+
      * @return {@code True} if current cache group is in recovery mode.
      */
     public boolean isRecoveryMode() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4889153..81872fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -4332,6 +4332,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             else
                 op = this.val == null ? GridCacheOperation.CREATE : UPDATE;
 
+            cctx.tm().pendingTxsTracker().onKeysWritten(tx.nearXidVersion(), Collections.singletonList(key));
+
             return cctx.shared().wal().log(new DataRecord(new DataEntry(
                 cctx.cacheId(),
                 key,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index ee81b06..c606a2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
@@ -260,6 +261,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Distributed latch manager. */
     private ExchangeLatchManager latchMgr;
 
+    /** List of exchange aware components. */
+    private final List<PartitionsExchangeAware> exchangeAwareComps = new ArrayList<>();
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -1146,6 +1150,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Registers component that will be notified on every partition map exchange.
+     *
+     * @param comp Component to be registered.
+     */
+    public void registerExchangeAwareComponent(PartitionsExchangeAware comp) {
+        exchangeAwareComps.add(new PartitionsExchangeAwareWrapper(comp));
+    }
+
+    /**
+     * Removes exchange aware component from list of listeners.
+     *
+     * @param comp Component to be registered.
+     */
+    public void unregisterExchangeAwareComponent(PartitionsExchangeAware comp) {
+        exchangeAwareComps.remove(new PartitionsExchangeAwareWrapper(comp));
+    }
+
+    /**
+     * @return List of registered exchange listeners.
+     */
+    public List<PartitionsExchangeAware> exchangeAwareComponents() {
+        return U.sealList(exchangeAwareComps);
+    }
+
+    /**
      * Partition refresh callback for selected cache groups.
      * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
      * for non coordinator -  {@link GridDhtPartitionsSingleMessage SingleMessages} send
@@ -1239,9 +1268,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     ) {
         long time = System.currentTimeMillis();
 
-        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null, grps);
-
-        m.topologyVersion(msgTopVer);
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null,
+            msgTopVer, null, null, null, grps);
 
         if (log.isInfoEnabled()) {
             long latency = System.currentTimeMillis() - time;
@@ -1306,13 +1334,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         boolean compress,
         boolean newCntrMap,
         @Nullable final GridDhtPartitionExchangeId exchId,
+        @Nullable AffinityTopologyVersion msgTopVer,
         @Nullable GridCacheVersion lastVer,
         @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
         @Nullable IgniteDhtPartitionsToReloadMap partsToReload
     ) {
         Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
 
-        return createPartitionsFullMessage(compress, newCntrMap, exchId, lastVer, partHistSuppliers, partsToReload, grps);
+        return createPartitionsFullMessage(compress, newCntrMap, exchId, msgTopVer, lastVer, partHistSuppliers, partsToReload, grps);
     }
 
     /**
@@ -1332,15 +1361,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         boolean compress,
         boolean newCntrMap,
         @Nullable final GridDhtPartitionExchangeId exchId,
+        @Nullable AffinityTopologyVersion msgTopVer,
         @Nullable GridCacheVersion lastVer,
         @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
         @Nullable IgniteDhtPartitionsToReloadMap partsToReload,
         Collection<CacheGroupContext> grps
     ) {
-        AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE;
+        assert (exchId != null) ^ (msgTopVer != null): "Topology version of full map message must be specified" +
+            " either via exchangeId=[" + exchId + "], or via msgTopVer=[" + msgTopVer + "].";
 
-        final GridDhtPartitionsFullMessage m =
-            new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload);
+        final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
+            lastVer,
+            exchId != null ? exchId.topologyVersion() : msgTopVer,
+            partHistSuppliers,
+            partsToReload
+            );
 
         m.compress(compress);
 
@@ -1396,6 +1431,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
 
+        cctx.kernalContext().txDr().onPartitionsFullMessagePrepared(exchId, m);
+
         return m;
     }
 
@@ -3614,6 +3651,73 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * That wrapper class allows avoiding the propagation of the user's exceptions into the Exchange thread.
+     */
+    private class PartitionsExchangeAwareWrapper implements PartitionsExchangeAware {
+        /** */
+        private final PartitionsExchangeAware delegate;
+
+        /**
+         * Creates a new wrapper.
+         * @param delegate Delegate.
+         */
+        public PartitionsExchangeAwareWrapper(PartitionsExchangeAware delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+            try {
+                delegate.onInitBeforeTopologyLock(fut);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to execute exchange callback.", e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+            try {
+                delegate.onInitAfterTopologyLock(fut);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to execute exchange callback.", e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+            try {
+                delegate.onDoneBeforeTopologyUnlock(fut);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to execute exchange callback.", e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+            try {
+                delegate.onDoneAfterTopologyUnlock(fut);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to execute exchange callback.", e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return delegate.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
+        @Override public boolean equals(Object obj) {
+            return delegate.equals(obj);
+        }
+    }
+
+    /**
      * Class to limit action count for unique objects.
      * <p>
      * NO guarantees of thread safety are provided.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 2156d33..14f0399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -390,7 +390,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
      */
     public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, boolean changedBaseline) {
         if (changedBaseline
-            && IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED)
+            && cctx.tm().pendingTxsTracker().enabled()
             || !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, true))
             return;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
index f850d34..2a9834a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
@@ -38,8 +38,8 @@ final class BinaryMetadataHolder implements Serializable {
 
     /**
      * @param metadata Metadata.
-     * @param pendingVer Version of this metadata - how many updates were issued for this type.
-     * @param acceptedVer Pending updates count.
+     * @param pendingVer Pending updates count.
+     * @param acceptedVer Version of this metadata - how many updates were issued for this type.
      */
     BinaryMetadataHolder(BinaryMetadata metadata, int pendingVer, int acceptedVer) {
         assert metadata != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index d3b573c..3e24968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -775,6 +775,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                 .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
                                 .collect(Collectors.toList());
 
+                            logKeysToPendingTxsTracker(entriesWithCounters);
+
                             cctx.wal().log(new DataRecord(entriesWithCounters));
                         }
 
@@ -817,6 +819,27 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         }
     }
 
+    /**
+     * @param dataEntries Data entries.
+     */
+    private void logKeysToPendingTxsTracker(List<DataEntry> dataEntries) {
+        for (DataEntry dataEntry : dataEntries) {
+            List<KeyCacheObject> readKeys = new ArrayList<>();
+            List<KeyCacheObject> writeKeys = new ArrayList<>();
+
+            if (dataEntry.op() == READ)
+                readKeys.add(dataEntry.key());
+            else
+                writeKeys.add(dataEntry.key());
+
+            if (!readKeys.isEmpty())
+                cctx.tm().pendingTxsTracker().onKeysRead(nearXidVersion(), readKeys);
+
+            if (!writeKeys.isEmpty())
+                cctx.tm().pendingTxsTracker().onKeysWritten(nearXidVersion(), writeKeys);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public final void commitRemoteTx() throws IgniteCheckedException {
         if (optimistic())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index db097af..41e7fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -29,12 +29,15 @@ import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DEFAULT_VOLATILE_DS_GROUP_NAME;
 
 /**
  *
@@ -93,8 +96,11 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
 
         PartitionLossPolicy lossPlc = grp.config().getPartitionLossPolicy();
 
-        if (cctx.shared().readOnlyMode() && opType == WRITE)
-            return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)");
+        if (cctx.shared().readOnlyMode() && opType == WRITE && !isSystemCache(cctx.name())
+                && cctx.group().groupId() != CU.cacheId(DEFAULT_VOLATILE_DS_GROUP_NAME)) {
+            return new IgniteClusterReadOnlyException("Failed to perform cache operation (cluster is in read only mode) " +
+                    "[cacheGrp=" + cctx.group().name() + ", cache=" + cctx.name() + ']');
+        }
 
         if (grp.needsRecovery() && !recovery) {
             if (opType == WRITE && (lossPlc == READ_ONLY_SAFE || lossPlc == READ_ONLY_ALL))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteClusterReadOnlyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteClusterReadOnlyException.java
new file mode 100644
index 0000000..b83eb5c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteClusterReadOnlyException.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ This exception is used to indicate that the cluster is in a read-only state
+ */
+public class IgniteClusterReadOnlyException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+    
+    /**
+     * Create empty exception.
+     */
+    public IgniteClusterReadOnlyException() {
+    }
+
+    /**
+     * Creates new exception with given error message.
+     *
+     * @param msg Error message.
+     */
+    public IgniteClusterReadOnlyException(String msg) {
+        super(msg);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ExchangeType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ExchangeType.java
new file mode 100644
index 0000000..f7d1e35
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ExchangeType.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+/**
+ * todo javadoc
+ */
+public enum ExchangeType {
+    /** */
+    CLIENT,
+
+    /** */
+    ALL,
+
+    /** */
+    NONE
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6b97678..47b9c0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -318,9 +318,8 @@ public class GridDhtPartitionDemander {
                     metrics.clearRebalanceCounters();
 
                     for (GridDhtPartitionDemandMessage msg : assignments.values()) {
-                        for (Integer partId : msg.partitions().fullSet()) {
+                        for (Integer partId : msg.partitions().fullSet())
                             metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
-                        }
 
                         CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap();
 
@@ -783,6 +782,9 @@ public class GridDhtPartitionDemander {
                             // If message was last for this partition,
                             // then we take ownership.
                             if (last) {
+                                if (ctx.kernalContext().txDr().shouldApplyUpdateCounterOnRebalance())
+                                    grp.offheap().onPartitionInitialCounterUpdated(p, supplyMsg.last().get(p));
+
                                 fut.partitionDone(nodeId, p, true);
 
                                 if (log.isDebugEnabled())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 59d8463..cd3ecc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -222,6 +222,8 @@ class GridDhtPartitionSupplier {
 
         SupplyContext sctx = null;
 
+        Map<Integer, Long> initUpdateCntrs;
+
         try {
             synchronized (scMap) {
                 sctx = scMap.remove(contextId);
@@ -274,12 +276,26 @@ class GridDhtPartitionSupplier {
             Set<Integer> remainingParts;
 
             if (sctx == null || sctx.iterator == null) {
-                iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
-
                 remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
 
+                Set<Integer> demandParts = new HashSet<>(demandMsg.partitions().fullSet());
+
                 CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap();
 
+                for (int i = 0; i < histMap.size(); ++i)
+                    demandParts.add(histMap.partitionAt(i));
+
+                initUpdateCntrs = new HashMap<>(demandParts.size());
+
+                for (Integer part : demandParts) {
+                    GridDhtLocalPartition loc = top.localPartition(part, demandMsg.topologyVersion(), false);
+
+                    if (loc != null && loc.state() == GridDhtPartitionState.OWNING)
+                        initUpdateCntrs.put(part, loc.updateCounter());
+                }
+
+                iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
+
                 for (int i = 0; i < histMap.size(); i++) {
                     int p = histMap.partitionAt(i);
 
@@ -311,6 +327,8 @@ class GridDhtPartitionSupplier {
                 iter = sctx.iterator;
 
                 remainingParts = sctx.remainingParts;
+
+                initUpdateCntrs = sctx.initUpdateCntrs;
             }
 
             final int msgMaxSize = grp.config().getRebalanceBatchSize();
@@ -332,7 +350,8 @@ class GridDhtPartitionSupplier {
                         saveSupplyContext(contextId,
                             iter,
                             remainingParts,
-                            demandMsg.rebalanceId()
+                            demandMsg.rebalanceId(),
+                            initUpdateCntrs
                         );
 
                         reply(topicId, demanderNode, demandMsg, supplyMsg, contextId);
@@ -393,7 +412,7 @@ class GridDhtPartitionSupplier {
                 }
 
                 if (iter.isPartitionDone(part)) {
-                    supplyMsg.last(part, loc.updateCounter());
+                    supplyMsg.last(part, initUpdateCntrs.get(part));
 
                     remainingParts.remove(part);
 
@@ -595,17 +614,19 @@ class GridDhtPartitionSupplier {
      * @param entryIt Entries rebalance iterator.
      * @param remainingParts Set of partitions that weren't sent yet.
      * @param rebalanceId Rebalance id.
+     * @param initUpdateCntrs Collection of update counters that corresponds to the beginning of rebalance.
      */
     private void saveSupplyContext(
         T3<UUID, Integer, AffinityTopologyVersion> contextId,
         IgniteRebalanceIterator entryIt,
         Set<Integer> remainingParts,
-        long rebalanceId
+        long rebalanceId,
+        Map<Integer, Long> initUpdateCntrs
     ) {
         synchronized (scMap) {
             assert scMap.get(contextId) == null;
 
-            scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId));
+            scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId, initUpdateCntrs));
         }
     }
 
@@ -623,17 +644,27 @@ class GridDhtPartitionSupplier {
         /** Rebalance id. */
         private final long rebalanceId;
 
+        /** Update counters for rebalanced partitions. */
+        private final Map<Integer, Long> initUpdateCntrs;
+
         /**
          * Constructor.
          *
          * @param iterator Entries rebalance iterator.
          * @param remainingParts Set of partitions which weren't sent yet.
          * @param rebalanceId Rebalance id.
+         * @param initUpdateCntrs Collection of update counters that corresponds to the beginning of rebalance.
          */
-        SupplyContext(IgniteRebalanceIterator iterator, Set<Integer> remainingParts, long rebalanceId) {
+        SupplyContext(
+            IgniteRebalanceIterator iterator,
+            Set<Integer> remainingParts,
+            long rebalanceId,
+            Map<Integer, Long> initUpdateCntrs
+        ) {
             this.iterator = iterator;
             this.remainingParts = remainingParts;
             this.rebalanceId = rebalanceId;
+            this.initUpdateCntrs = initUpdateCntrs;
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4e223c4..a819b8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -104,6 +104,7 @@ import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.TimeBag;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -204,6 +205,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private AtomicBoolean added = new AtomicBoolean(false);
 
+    /** Exchange type. */
+    private volatile ExchangeType exchangeType;
+
     /**
      * Discovery event receive latch. There is a race between discovery event processing and single message
      * processing, so it is possible to create an exchange future before the actual discovery event is received.
@@ -496,6 +500,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @return Exchange type or <code>null</code> if not determined yet.
+     */
+    public ExchangeType exchangeType() {
+        return exchangeType;
+    }
+
+    /**
      * Retreives the node which has WAL history since {@code cntrSince}.
      *
      * @param grpId Cache group ID.
@@ -852,6 +863,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             cctx.cache().registrateProxyRestart(resolveCacheRequests(exchActions), afterLsnrCompleteFut);
 
+            exchangeType = exchange;
+
+            for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+                comp.onInitBeforeTopologyLock(this);
+
             updateTopologies(crdNode);
 
             timeBag.finishGlobalStage("Determine exchange type");
@@ -899,6 +915,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
+            for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+                comp.onInitAfterTopologyLock(this);
+
             if (exchLog.isInfoEnabled())
                 exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
         }
@@ -1185,6 +1204,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 cctx.exchange().exchangerBlockingSectionBegin();
 
                 try {
+                    kctx.txDr().onDeActivate(cctx.kernalContext());
+
                     kctx.dataStructures().onDeActivate(kctx);
 
                     if (cctx.kernalContext().service() instanceof GridServiceProcessor)
@@ -1938,6 +1959,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             compress,
             newCntrMap,
             exchangeId(),
+            null,
             last != null ? last : cctx.versions().last(),
             partHistSuppliers,
             partsToReload);
@@ -2207,6 +2229,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (err == null)
                 cctx.coordinators().onExchangeDone(events().discoveryCache());
 
+            for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+                comp.onDoneBeforeTopologyUnlock(this);
+
             // Create and destory caches and cache proxies.
             cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
 
@@ -2308,6 +2333,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
+            for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+                comp.onDoneAfterTopologyUnlock(this);
+
             if (firstDiscoEvt instanceof DiscoveryCustomEvent)
                 ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
 
@@ -3150,7 +3178,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         top.globalPartSizes(partSizes);
 
-        Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory);
+        TransactionalDrProcessor txDrProc = cctx.kernalContext().txDr();
+
+        boolean skipResetOwners = txDrProc != null && txDrProc.shouldIgnoreAssignPartitionStates(this);
+
+        Set<UUID> joinedNodes = new HashSet<>();
+
+        for (DiscoveryEvent evt : events().events()) {
+            if (evt.type() == EVT_NODE_JOINED)
+                joinedNodes.add(evt.eventNode().id());
+        }
+
+        Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(
+            ownersByUpdCounters, haveHistory, joinedNodes, skipResetOwners);
 
         for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
             UUID nodeId = e.getKey();
@@ -3413,6 +3453,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     });
             }
 
+            TransactionalDrProcessor txDrProc = cctx.kernalContext().txDr();
+
+            boolean skipResetOwners = txDrProc != null && txDrProc.shouldIgnoreAssignPartitionStates(this);
+
             timeBag.finishGlobalStage("Affinity recalculation (crd)");
 
             Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(cctx.cache().cacheGroups().size());
@@ -3469,9 +3513,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 timeBag.finishGlobalStage("Ideal affinity diff calculation (enforced)");
             }
 
-            for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
-                if (!grpCtx.isLocal())
-                    grpCtx.topology().applyUpdateCounters();
+            if (!skipResetOwners) {
+                for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
+                    if (!grpCtx.isLocal())
+                        grpCtx.topology().applyUpdateCounters();
+                }
             }
 
             timeBag.finishGlobalStage("Apply update counters");
@@ -3639,6 +3685,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         GridDhtPartitionsSingleMessage msg,
         Map<Integer, CacheGroupAffinityMessage> messageAccumulator
     ) {
+        TransactionalDrProcessor txDrProc = cctx.kernalContext().txDr();
+
+        boolean skipResetOwners = txDrProc != null && txDrProc.shouldIgnoreAssignPartitionStates(this);
+
         for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
             Integer grpId = e.getKey();
 
@@ -3650,7 +3700,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
 
-            if (cntrs != null)
+            if (cntrs != null && !skipResetOwners)
                 top.collectUpdateCounters(cntrs);
         }
 
@@ -5034,20 +5084,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      *
      */
-    enum ExchangeType {
-        /** */
-        CLIENT,
-
-        /** */
-        ALL,
-
-        /** */
-        NONE
-    }
-
-    /**
-     *
-     */
     private enum ExchangeLocalState {
         /** Local node is coordinator. */
         CRD,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 5181e90..2c99314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -182,6 +183,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (exchFut.localJoinExchange())
             return true; // Required, can have outdated updSeq partition counter if node reconnects.
 
+        TransactionalDrProcessor txDrProc = ctx.kernalContext().txDr();
+
+        if (txDrProc != null && txDrProc.shouldScheduleRebalance(exchFut))
+            return true;
+
         if (!grp.affinity().cachedVersions().contains(rebTopVer)) {
             assert rebTopVer.compareTo(grp.localStartVersion()) <= 0 :
                 "Empty hisroty allowed only for newly started cache group [rebTopVer=" + rebTopVer +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
new file mode 100644
index 0000000..ffe5766
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+/**
+ * todo javadocs
+ */
+public interface PartitionsExchangeAware {
+    public default void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+        // No-op.
+    }
+
+    public default void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+        // No-op.
+    }
+
+    public default void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        // No-op.
+    }
+
+    public default void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        // No-op.
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index e9b8b6b..e3d06c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1161,7 +1161,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) {
+    @Override public Map<UUID, Set<Integer>> resetOwners(
+        Map<Integer, Set<UUID>> ownersByUpdCounters,
+        Set<Integer> haveHistory,
+        Set<UUID> joinedNodes,
+        boolean skipResetOwners
+    ) {
         Map<UUID, Set<Integer>> result = new HashMap<>();
 
         lock.writeLock().lock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 890693b..61a5408 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -423,9 +423,15 @@ public interface GridDhtPartitionTopology {
      * @param ownersByUpdCounters Map (partition, set of node IDs that have most actual state about partition
      *                            (update counter is maximal) and should hold OWNING state for such partition).
      * @param haveHistory Set of partitions which have WAL history to rebalance.
+     * @param joinedNodes Set of nodes that joined topology in case of distributed exchange.
+     * @param skipResetOwners If true, resetting owner will skipped for all nodes except joined ones.
      * @return Map (nodeId, set of partitions that should be rebalanced <b>fully</b> by this node).
      */
-    public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory);
+    public Map<UUID, Set<Integer>> resetOwners(
+        Map<Integer, Set<UUID>> ownersByUpdCounters,
+        Set<Integer> haveHistory,
+        Set<UUID> joinedNodes,
+        boolean skipResetOwners);
 
     /**
      * Callback on exchange done.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index aed494d..fbddb1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -882,8 +882,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             GridDhtLocalPartition part = locParts.get(p);
 
-            if (part != null && part.state() != EVICTED)
-                return part;
+            if (part != null) {
+                if (part.state() != EVICTED)
+                    return part;
+                else
+                    part.awaitDestroy();
+            }
 
             part = new GridDhtLocalPartition(ctx, grp, p, true);
 
@@ -2187,7 +2191,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) {
+    @Override public Map<UUID, Set<Integer>> resetOwners(
+        Map<Integer, Set<UUID>> ownersByUpdCounters,
+        Set<Integer> haveHistory,
+        Set<UUID> joinedNodes,
+        boolean skipResetOwners
+    ) {
         Map<UUID, Set<Integer>> result = new HashMap<>();
 
         ctx.database().checkpointReadLock();
@@ -2197,6 +2206,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             try {
                 // First process local partitions.
+                UUID locNodeId = ctx.localNodeId();
+
                 for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
                     int part = entry.getKey();
                     Set<UUID> newOwners = entry.getValue();
@@ -2206,11 +2217,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (locPart == null || locPart.state() != OWNING)
                         continue;
 
-                    if (!newOwners.contains(ctx.localNodeId())) {
+                    if ((joinedNodes.contains(locNodeId) || !skipResetOwners) && !newOwners.contains(locNodeId)) {
                         rebalancePartition(part, haveHistory.contains(part));
 
-                        result.computeIfAbsent(ctx.localNodeId(), n -> new HashSet<>());
-                        result.get(ctx.localNodeId()).add(part);
+                        result.computeIfAbsent(locNodeId, n -> new HashSet<>());
+                        result.get(locNodeId).add(part);
                     }
                 }
 
@@ -2221,6 +2232,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                     for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
                         UUID remoteNodeId = remotes.getKey();
+
+                        if (!joinedNodes.contains(remoteNodeId) && skipResetOwners)
+                            continue;
+
                         GridDhtPartitionMap partMap = remotes.getValue();
 
                         GridDhtPartitionState state = partMap.get(part);
@@ -2233,7 +2248,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                             partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
 
-                            if (partMap.nodeId().equals(ctx.localNodeId()))
+                            if (partMap.nodeId().equals(locNodeId))
                                 updateSeq.setIfGreater(partMap.updateSequence());
 
                             result.computeIfAbsent(remoteNodeId, n -> new HashSet<>());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 0b75dfc..12793da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a300447..7251794 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -45,6 +45,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -59,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -121,6 +123,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
@@ -174,6 +177,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
@@ -224,7 +228,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private final int walRebalanceThreshold = getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
 
     /** Value of property for throttling policy override. */
-    private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
+    private final String throttlingPlcOverride = IgniteSystemProperties.getString(
         IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
 
     /** */
@@ -270,6 +274,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** */
     private static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
 
+    /** Throttle logging threshold. Warning will be raised if thread is being parked for this long. */
+    private static final long THROTTLE_LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(5);
+
+    /** Throttle queue size threshold. Async applying will be throttled starting from this queue size. */
+    private static final int THROTTLE_QUEUE_SIZE_THRESHOLD = 10_000;
+
     /** This number of threads will be created and used for parallel sorting. */
     private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
 
@@ -292,7 +302,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private long checkpointFreq;
 
     /** */
-    private CheckpointHistory cpHistory;
+    private CheckpointHistory cpHist;
 
     /** */
     private FilePageStoreManager storeMgr;
@@ -378,7 +388,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>();
 
     /** Initially local wal disabled groups. */
-    private Collection<Integer> initiallyLocalWalDisabledGrps = new HashSet<>();
+    private Collection<Integer> initiallyLocWalDisabledGrps = new HashSet<>();
 
     /** File I/O factory for writing checkpoint markers. */
     private final FileIOFactory ioFactory;
@@ -536,7 +546,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
 
-            cpHistory = new CheckpointHistory(kernalCtx);
+            cpHist = new CheckpointHistory(kernalCtx);
 
             IgnitePageStoreManager store = cctx.pageStore();
 
@@ -629,8 +639,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override public void cleanupCheckpointDirectory() throws IgniteCheckedException {
-        if (cpHistory != null)
-            cpHistory = new CheckpointHistory(cctx.kernalContext());
+        if (cpHist != null)
+            cpHist = new CheckpointHistory(cctx.kernalContext());
 
         try {
             try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath())) {
@@ -1139,17 +1149,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
 
-        if (trackable)
+        if (trackable) {
             changeTracker = new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
                 @Override public void applyx(
                     Long page,
                     FullPageId fullId,
                     PageMemoryEx pageMem
                 ) throws IgniteCheckedException {
-                    if (trackable)
-                        snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
+                    snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
                 }
             };
+        }
         else
             changeTracker = null;
 
@@ -1241,13 +1251,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
             : PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY;
 
-        if (throttlingPolicyOverride != null) {
+        if (throttlingPlcOverride != null) {
             try {
-                plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPolicyOverride.toUpperCase());
+                plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPlcOverride.toUpperCase());
             }
             catch (IllegalArgumentException e) {
                 log.error("Incorrect value of IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED property. " +
-                    "The default throttling policy will be used [plc=" + throttlingPolicyOverride +
+                    "The default throttling policy will be used [plc=" + throttlingPlcOverride +
                     ", defaultPlc=" + plc + ']');
             }
         }
@@ -1731,7 +1741,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         checkpointReadLock();
 
         try {
-            earliestValidCheckpoints = cpHistory.searchAndReserveCheckpoints(applicableGroupsAndPartitions);
+            earliestValidCheckpoints = cpHist.searchAndReserveCheckpoints(applicableGroupsAndPartitions);
         }
         finally {
             checkpointReadUnlock();
@@ -1817,7 +1827,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
-        CheckpointEntry cpEntry = cpHistory.searchCheckpointEntry(grpId, partId, cntr);
+        CheckpointEntry cpEntry = cpHist.searchCheckpointEntry(grpId, partId, cntr);
 
         if (cpEntry == null)
             return false;
@@ -1898,7 +1908,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override public WALPointer lastCheckpointMarkWalPointer() {
-        CheckpointEntry lastCheckpointEntry = cpHistory == null ? null : cpHistory.lastCheckpoint();
+        CheckpointEntry lastCheckpointEntry = cpHist == null ? null : cpHist.lastCheckpoint();
 
         return lastCheckpointEntry == null ? null : lastCheckpointEntry.checkpointMark();
     }
@@ -2380,7 +2390,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr, exec);
         }
 
-        cpHistory.initialize(retreiveHistory());
+        cpHist.initialize(retreiveHistory());
 
         return restoreBinaryState;
     }
@@ -2596,52 +2606,81 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      * Apply update from some iterator and with specific filters.
      *
-     * @param it WalIterator.
-     * @param recPredicate Wal record filter.
-     * @param entryPredicate Entry filter.
+     * @param it WAL iterator.
+     * @param stopPred WAL record stop predicate.
+     * @param entryPred Entry filter.
      */
     public void applyUpdatesOnRecovery(
         @Nullable WALIterator it,
-        IgniteBiPredicate<WALPointer, WALRecord> recPredicate,
-        IgnitePredicate<DataEntry> entryPredicate
+        IgniteBiPredicate<WALPointer, WALRecord> stopPred,
+        IgniteBiPredicate<WALRecord, DataEntry> entryPred
     ) throws IgniteCheckedException {
         if (it == null)
             return;
 
         cctx.walState().runWithOutWAL(() -> {
-            while (it.hasNext()) {
-                IgniteBiTuple<WALPointer, WALRecord> next = it.next();
+            if (it != null)
+                applyUpdates(it, stopPred, entryPred, false, null, false);
+        });
+    }
 
-                WALRecord rec = next.get2();
+    /**
+     * Applies data entries updates from given WAL iterator if entry satisfies to provided predicates.
+     * @param it WAL iterator.
+     * @param stopPred WAL record predicate for stopping iteration.
+     * @param entryPred Data record and corresponding data entries predicate.
+     * @param lockEntries If true, update will be performed under entry lock.
+     * @param onWalPointerApplied Listener to be invoked after every WAL record applied.
+     * @param asyncApply If true, updates applying will be delegated to the striped executor.
+     */
+    public void applyUpdates(
+        WALIterator it,
+        @Nullable IgniteBiPredicate<WALPointer, WALRecord> stopPred,
+        IgniteBiPredicate<WALRecord, DataEntry> entryPred,
+        boolean lockEntries,
+        IgniteInClosure<WALPointer> onWalPointerApplied,
+        boolean asyncApply
+    ) {
+        StripedExecutor exec = cctx.kernalContext().getStripedExecutorService();
 
-                if (!recPredicate.apply(next.get1(), rec))
-                    break;
+        AtomicReference<IgniteCheckedException> applyError = new AtomicReference<>();
 
-                switch (rec.type()) {
-                    case MVCC_DATA_RECORD:
-                    case DATA_RECORD:
+        int[] stripesThrottleAccumulator = new int[exec.stripes()];
+
+        while (it.hasNext()) {
+            IgniteBiTuple<WALPointer, WALRecord> next = it.next();
+
+            WALRecord rec = next.get2();
+
+            if (stopPred != null && stopPred.apply(next.get1(), rec))
+                break;
+
+            switch (rec.type()) {
+                case MVCC_DATA_RECORD:
+                case DATA_RECORD:
+                    if (entryPred.apply(rec, null)) {
                         checkpointReadLock();
 
                         try {
                             DataRecord dataRec = (DataRecord)rec;
 
                             for (DataEntry dataEntry : dataRec.writeEntries()) {
-                                if (entryPredicate.apply(dataEntry)) {
-                                    checkpointReadLock();
-
-                                    try {
-                                        int cacheId = dataEntry.cacheId();
+                                if (entryPred.apply(rec, dataEntry)) {
+                                    int cacheId = dataEntry.cacheId();
 
-                                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                                    GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                                        if (cacheCtx != null)
-                                            applyUpdate(cacheCtx, dataEntry);
-                                        else if (log != null)
-                                            log.warning("Cache is not started. Updates cannot be applied " +
-                                                "[cacheId=" + cacheId + ']');
+                                    if (cacheCtx != null) {
+                                        if (asyncApply) {
+                                            applyUpdateAsync(cacheCtx, dataEntry, lockEntries, exec,
+                                                applyError, stripesThrottleAccumulator);
+                                        }
+                                        else
+                                            applyUpdate(cacheCtx, dataEntry, lockEntries);
                                     }
-                                    finally {
-                                        checkpointReadUnlock();
+                                    else if (log != null) {
+                                        log.warning("Cache is not started. Updates cannot be applied " +
+                                            "[cacheId=" + cacheId + ']');
                                     }
                                 }
                             }
@@ -2652,10 +2691,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         finally {
                             checkpointReadUnlock();
                         }
+                    }
 
-                        break;
+                    break;
 
-                    case MVCC_TX_RECORD:
+                case MVCC_TX_RECORD:
+                    if (entryPred.apply(rec, null)) {
                         checkpointReadLock();
 
                         try {
@@ -2668,12 +2709,96 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         finally {
                             checkpointReadUnlock();
                         }
+                    }
 
-                        break;
+                    break;
 
-                    default:
-                        // Skip other records.
+                default:
+                    // Skip other records.
+            }
+
+            if (onWalPointerApplied != null)
+                onWalPointerApplied.apply(rec.position());
+        }
+
+        if (applyError.get() != null)
+            throw new IgniteException(applyError.get()); // Fail-fast check.
+        else {
+            CountDownLatch stripesClearLatch = new CountDownLatch(exec.stripes());
+
+            // We have to ensure that all asynchronous updates are done.
+            // StripedExecutor guarantees ordering inside stripe - it would enough to await "finishing" tasks.
+            for (int i = 0; i < exec.stripes(); i++)
+                exec.execute(i, stripesClearLatch::countDown);
+
+            try {
+                stripesClearLatch.await();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedException(e);
+            }
+        }
+
+        if (applyError.get() != null)
+            throw new IgniteException(applyError.get());
+    }
+
+    /**
+     * Applies update in given striped executor.
+     *
+     * @param cacheCtx Cache context.
+     * @param dataEntry Data entry.
+     * @param lockEntries Lock entries.
+     * @param exec Executor.
+     * @param applyError Apply error reference.
+     * @param stripesThrottleAccumulator Array with accumulated throttle powers for each stripe.
+     */
+    private void applyUpdateAsync(
+        GridCacheContext cacheCtx,
+        DataEntry dataEntry,
+        boolean lockEntries,
+        StripedExecutor exec,
+        AtomicReference<IgniteCheckedException> applyError,
+        int[] stripesThrottleAccumulator
+    ) throws IgniteCheckedException {
+        if (applyError.get() != null)
+            throw applyError.get();
+
+        int stripeIdx = dataEntry.partitionId() % exec.stripes();
+
+        assert stripeIdx >= 0 : "Stripe index should be non-negative: " + stripeIdx;
+
+        if (exec.queueSize(stripeIdx) > THROTTLE_QUEUE_SIZE_THRESHOLD) {
+            int throttlePower = ++stripesThrottleAccumulator[stripeIdx];
+
+            long throttleParkTimeNs = (long)(1000L * Math.pow(1.05, throttlePower));
+
+            if (throttleParkTimeNs > THROTTLE_LOGGING_THRESHOLD) {
+                U.warn(log, "Parking thread=" + Thread.currentThread().getName()
+                    + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
+            }
+
+            LockSupport.parkNanos(throttleParkTimeNs);
+        }
+        else
+            stripesThrottleAccumulator[stripeIdx] = 0;
+
+        exec.execute(stripeIdx, () -> {
+            try {
+                if (applyError.get() != null)
+                    return;
+
+                checkpointReadLock();
+
+                try {
+                    applyUpdate(cacheCtx, dataEntry, lockEntries);
                 }
+                finally {
+                    checkpointReadUnlock();
+                }
+            }
+            catch (IgniteCheckedException e) {
+                applyError.compareAndSet(null, e);
             }
         });
     }
@@ -2741,7 +2866,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                                     cctx.kernalContext().query().markAsRebuildNeeded(cacheCtx);
 
                                 try {
-                                    applyUpdate(cacheCtx, dataEntry);
+                                    applyUpdate(cacheCtx, dataEntry, false);
                                 }
                                 catch (IgniteCheckedException e) {
                                     U.error(log, "Failed to apply data entry, dataEntry=" + dataEntry +
@@ -2859,18 +2984,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @param highBound WALPointer.
      */
     public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
-        List<CheckpointEntry> removedFromHistory = cpHistory.onWalTruncated(highBound);
+        List<CheckpointEntry> rmvFromHist = cpHist.onWalTruncated(highBound);
 
-        for (CheckpointEntry cp : removedFromHistory)
+        for (CheckpointEntry cp : rmvFromHist)
             removeCheckpointFiles(cp);
     }
 
     /**
      * @param cacheCtx Cache context to apply an update.
      * @param dataEntry Data entry to apply.
+     * @param lockEntry If true, update will be performed under entry lock.
      * @throws IgniteCheckedException If failed to restore.
      */
-    private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
+    private void applyUpdate(
+        GridCacheContext cacheCtx,
+        DataEntry dataEntry,
+        boolean lockEntry
+    ) throws IgniteCheckedException {
         int partId = dataEntry.partitionId();
 
         if (partId == -1)
@@ -2878,51 +3008,85 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         GridDhtLocalPartition locPart = cacheCtx.isLocal() ? null : cacheCtx.topology().forceCreatePartition(partId);
 
+        GridCacheEntryEx entryEx = null;
+
         switch (dataEntry.op()) {
             case CREATE:
             case UPDATE:
-                if (dataEntry instanceof MvccDataEntry) {
-                    cacheCtx.offheap().mvccApplyUpdate(
-                        cacheCtx,
-                        dataEntry.key(),
-                        dataEntry.value(),
-                        dataEntry.writeVersion(),
-                        dataEntry.expireTime(),
-                        locPart,
-                        ((MvccDataEntry)dataEntry).mvccVer());
+                if (lockEntry) {
+                    entryEx = cacheCtx.isNear() ? cacheCtx.near().dht().entryEx(dataEntry.key()) :
+                        cacheCtx.cache().entryEx(dataEntry.key());
+
+                    entryEx.lockEntry();
                 }
-                else {
-                    cacheCtx.offheap().update(
-                        cacheCtx,
-                        dataEntry.key(),
-                        dataEntry.value(),
-                        dataEntry.writeVersion(),
-                        dataEntry.expireTime(),
-                        locPart,
-                        null);
+
+                try {
+                    if (dataEntry instanceof MvccDataEntry) {
+                        cacheCtx.offheap().mvccApplyUpdate(
+                            cacheCtx,
+                            dataEntry.key(),
+                            dataEntry.value(),
+                            dataEntry.writeVersion(),
+                            dataEntry.expireTime(),
+                            locPart,
+                            ((MvccDataEntry)dataEntry).mvccVer());
+                    }
+                    else {
+                        cacheCtx.offheap().update(
+                            cacheCtx,
+                            dataEntry.key(),
+                            dataEntry.value(),
+                            dataEntry.writeVersion(),
+                            dataEntry.expireTime(),
+                            locPart,
+                            null);
+                    }
+
+                    if (dataEntry.partitionCounter() != 0)
+                        cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
                 }
+                finally {
+                    if (lockEntry) {
+                        entryEx.unlockEntry();
 
-                if (dataEntry.partitionCounter() != 0)
-                    cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
+                        entryEx.context().evicts().touch(entryEx);
+                    }
+                }
 
                 break;
 
             case DELETE:
-                if (dataEntry instanceof MvccDataEntry) {
-                    cacheCtx.offheap().mvccApplyUpdate(
-                        cacheCtx,
-                        dataEntry.key(),
-                        null,
-                        dataEntry.writeVersion(),
-                        0L,
-                        locPart,
-                        ((MvccDataEntry)dataEntry).mvccVer());
+                if (lockEntry) {
+                    entryEx = cacheCtx.isNear() ? cacheCtx.near().dht().entryEx(dataEntry.key()) :
+                        cacheCtx.cache().entryEx(dataEntry.key());
+
+                    entryEx.lockEntry();
                 }
-                else
-                    cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
 
-                if (dataEntry.partitionCounter() != 0)
-                    cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
+                try {
+                    if (dataEntry instanceof MvccDataEntry) {
+                        cacheCtx.offheap().mvccApplyUpdate(
+                            cacheCtx,
+                            dataEntry.key(),
+                            null,
+                            dataEntry.writeVersion(),
+                            0L,
+                            locPart,
+                            ((MvccDataEntry)dataEntry).mvccVer());
+                    }
+                    else
+                        cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
+
+                    if (dataEntry.partitionCounter() != 0)
+                        cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
+                }
+                finally {
+                    if (lockEntry) {
+                        entryEx.unlockEntry();
+
+                        entryEx.context().evicts().touch(entryEx);
+                    }
+                }
 
                 break;
 
@@ -3227,7 +3391,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         // Do not hold groups state in-memory if there is no space in the checkpoint history to prevent possible OOM.
         // In this case the actual group states will be readed from WAL by demand.
-        if (rec != null && cpHistory.hasSpace())
+        if (rec != null && cpHist.hasSpace())
             cacheGrpStates = rec.cacheGroupStates();
 
         return new CheckpointEntry(cpTs, ptr, cpId, cacheGrpStates);
@@ -3237,7 +3401,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @return Checkpoint history.
      */
     @Nullable public CheckpointHistory checkpointHistory() {
-        return cpHistory;
+        return cpHist;
     }
 
     /**
@@ -4038,7 +4202,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         cpRec,
                         CheckpointEntryType.START);
 
-                    cpHistory.addCheckpoint(cp);
+                    cpHist.addCheckpoint(cp);
                 }
             }
             finally {
@@ -4375,7 +4539,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 cctx.wal().notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
             }
 
-            List<CheckpointEntry> removedFromHistory = cpHistory.onCheckpointFinished(chp, truncateWalOnCpFinish);
+            List<CheckpointEntry> removedFromHistory = cpHist.onCheckpointFinished(chp, truncateWalOnCpFinish);
 
             for (CheckpointEntry cp : removedFromHistory)
                 removeCheckpointFiles(cp);
@@ -5166,7 +5330,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** {@inheritDoc} */
     @Override public boolean walEnabled(int grpId, boolean local) {
         if (local)
-            return !initiallyLocalWalDisabledGrps.contains(grpId);
+            return !initiallyLocWalDisabledGrps.contains(grpId);
         else
             return !initiallyGlobalWalDisabledGrps.contains(grpId);
     }
@@ -5216,7 +5380,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         checkpointReadLock();
 
         try {
-            CheckpointEntry lastCp = cpHistory.lastCheckpoint();
+            CheckpointEntry lastCp = cpHist.lastCheckpoint();
             long lastCpTs = lastCp != null ? lastCp.timestamp() : 0;
 
             if (lastCpTs != 0)
@@ -5242,7 +5406,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 if (t2 != null) {
                     if (t2.get2())
-                        initiallyLocalWalDisabledGrps.add(t2.get1());
+                        initiallyLocWalDisabledGrps.add(t2.get1());
                     else
                         initiallyGlobalWalDisabledGrps.add(t2.get1());
                 }
@@ -5412,7 +5576,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      */
     private IgnitePredicate<Integer> groupsWithEnabledWal() {
         return groupId -> !initiallyGlobalWalDisabledGrps.contains(groupId)
-            && !initiallyLocalWalDisabledGrps.contains(groupId);
+            && !initiallyLocWalDisabledGrps.contains(groupId);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index d583a04..ad1bc91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -452,11 +452,11 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
 
     /** */
     private void checkRootsPageIdFlag(long treeRoot, long reuseListRoot) throws StorageException {
-        if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA)
+        if (PageIdUtils.flag(treeRoot) != FLAG_DATA)
             throw new StorageException("Wrong tree root page id flag: treeRoot="
                 + U.hexLong(treeRoot) + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID);
 
-        if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA)
+        if (PageIdUtils.flag(reuseListRoot) != FLAG_DATA)
             throw new StorageException("Wrong reuse list root page id flag: reuseListRoot="
                 + U.hexLong(reuseListRoot) + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID);
     }
@@ -515,11 +515,11 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
                         //MetaStorage never encrypted so realPageSize == pageSize.
                         io.initNewPage(pageAddr, partMetaId, pageMem.pageSize());
 
-                        treeRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, PageMemory.FLAG_DATA);
-                        reuseListRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, PageMemory.FLAG_DATA);
+                        treeRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, FLAG_DATA);
+                        reuseListRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, FLAG_DATA);
 
-                        assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
-                        assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
+                        assert PageIdUtils.flag(treeRoot) == FLAG_DATA;
+                        assert PageIdUtils.flag(reuseListRoot) == FLAG_DATA;
 
                         io.setTreeRoot(pageAddr, treeRoot);
                         io.setReuseListRoot(pageAddr, reuseListRoot);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index f1a26be..a86cc25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -1017,7 +1017,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
         }
         catch (RuntimeException | AssertionError e) {
-            throw new CorruptedTreeException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+            throw createCorruptedTreeException("Runtime failure on bounds: [lower=%s, upper=%s]", e, lower, upper);
         }
         finally {
             checkDestroyed();
@@ -1182,7 +1182,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throw new IgniteCheckedException("Runtime failure on first row lookup", e);
         }
         catch (RuntimeException | AssertionError e) {
-            throw new CorruptedTreeException("Runtime failure on first row lookup", e);
+            throw createCorruptedTreeException("Runtime failure on first row lookup", e);
         }
         finally {
             checkDestroyed();
@@ -1253,7 +1253,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throw new IgniteCheckedException("Runtime failure on lookup row: " + row, e);
         }
         catch (RuntimeException | AssertionError e) {
-            throw new CorruptedTreeException("Runtime failure on lookup row: " + row, e);
+            throw createCorruptedTreeException("Runtime failure on lookup row: %s", e, row);
         }
         finally {
             checkDestroyed();
@@ -1813,7 +1813,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
         }
         catch (RuntimeException | AssertionError e) {
-            throw new CorruptedTreeException("Runtime failure on search row: " + row, e);
+            throw createCorruptedTreeException("Runtime failure on search row: %s", e, row);
         }
         finally {
             x.releaseAll();
@@ -1970,7 +1970,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
         }
         catch (RuntimeException | AssertionError e) {
-            throw new CorruptedTreeException("Runtime failure on search row: " + row, e);
+            throw createCorruptedTreeException("Runtime failure on search row: %s", e, row);
         }
         finally {
             r.releaseAll();
@@ -2322,7 +2322,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throw new IgniteCheckedException("Runtime failure on row: " + row, e);
         }
         catch (RuntimeException | AssertionError e) {
-            throw new CorruptedTreeException("Runtime failure on row: " + row, e);
+            throw createCorruptedTreeException("Runtime failure on row: %s", e, row);
         }
         finally {
             checkDestroyed();
@@ -5835,4 +5835,22 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     protected IoStatisticsHolder statisticsHolder() {
         return IoStatisticsHolderNoOp.INSTANCE;
     }
+
+    /**
+     * Creates a new instance of {@link CorruptedTreeException}.
+     *
+     * @param message Detailed error message.
+     * @param cause The cause.
+     * @param rows Optional parameters.
+     * @return New instance of {@link CorruptedTreeException}.
+     */
+    private CorruptedTreeException createCorruptedTreeException(String message, Throwable cause, Object... rows) {
+        try {
+            return new CorruptedTreeException(String.format(message, rows), cause);
+        }
+        catch (Throwable t) {
+            // Failed to create string representation of optional parameters.
+            return new CorruptedTreeException("", cause);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index f744b3f..9af42d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -310,7 +310,7 @@ public abstract class AbstractWalRecordsIterator
      * @param desc File descriptor.
      * @param start Optional start pointer. Null means read from the beginning.
      * @param fileIO fileIO associated with file descriptor
-     * @param segmentHeader read segment header from fileIO
+     * @param segmentHdr read segment header from fileIO
      * @return Initialized file read header.
      * @throws IgniteCheckedException If initialized failed due to another unexpected error.
      */
@@ -318,10 +318,10 @@ public abstract class AbstractWalRecordsIterator
         @NotNull final AbstractFileDescriptor desc,
         @Nullable final FileWALPointer start,
         @NotNull final SegmentIO fileIO,
-        @NotNull final SegmentHeader segmentHeader
+        @NotNull final SegmentHeader segmentHdr
     ) throws IgniteCheckedException {
         try {
-            boolean isCompacted = segmentHeader.isCompacted();
+            boolean isCompacted = segmentHdr.isCompacted();
 
             if (isCompacted)
                 serializerFactory.skipPositionCheck(true);
@@ -341,7 +341,7 @@ public abstract class AbstractWalRecordsIterator
                 }
             }
 
-            int serVer = segmentHeader.getSerializerVersion();
+            int serVer = segmentHdr.getSerializerVersion();
 
             return createReadFileHandle(fileIO, serializerFactory.createSerializer(serVer), in);
         }
@@ -366,6 +366,9 @@ public abstract class AbstractWalRecordsIterator
             throw new IgniteCheckedException(
                 "Failed to initialize WAL segment after reading segment header: " + desc.file().getAbsolutePath(), e);
         }
+        finally {
+            serializerFactory.clearSegmentLocalState();
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 094ade7..3aa85ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -29,7 +29,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.FileChannel;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.sql.Time;
@@ -127,7 +126,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.CREATE_NEW;
 import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
@@ -449,7 +447,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     }
                 });
 
-            segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled());
+            segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled(), log);
 
             if (isArchiverEnabled())
                 archiver = new FileArchiver(segmentAware, log);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 3281d7c..7e01728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
 
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 
 import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentArchivedStorage.buildArchivedStorage;
@@ -40,10 +41,11 @@ public class SegmentAware {
     /**
      * @param walSegmentsCnt Total WAL segments count.
      * @param compactionEnabled Is wal compaction enabled.
+     * @param log Logger.
      */
-    public SegmentAware(int walSegmentsCnt, boolean compactionEnabled) {
+    public SegmentAware(int walSegmentsCnt, boolean compactionEnabled, IgniteLogger log) {
         segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage);
-        segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, compactionEnabled);
+        segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, compactionEnabled, log);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 458f119..c565bf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -16,11 +16,12 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
 
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 
 /**
  * Storage of actual information about current index of compressed segments.
@@ -53,25 +54,37 @@ public class SegmentCompressStorage {
     /** Min uncompressed index to keep. */
     private volatile long minUncompressedIdxToKeep = -1L;
 
+    /** Logger. */
+    private final IgniteLogger log;
+
     /**
      * @param segmentArchivedStorage Storage of last archived segment.
      * @param compactionEnabled If WAL compaction enabled.
+     * @param log Logger.
      */
-    private SegmentCompressStorage(SegmentArchivedStorage segmentArchivedStorage, boolean compactionEnabled) {
+    private SegmentCompressStorage(
+        SegmentArchivedStorage segmentArchivedStorage,
+        boolean compactionEnabled,
+        IgniteLogger log) {
         this.segmentArchivedStorage = segmentArchivedStorage;
 
         this.compactionEnabled = compactionEnabled;
 
         this.segmentArchivedStorage.addObserver(this::onSegmentArchived);
+
+        this.log = log;
     }
 
     /**
      * @param segmentArchivedStorage Storage of last archived segment.
      * @param compactionEnabled If WAL compaction enabled.
+     * @param log Logger.
      */
-    static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage segmentArchivedStorage,
-                                                       boolean compactionEnabled) {
-        SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage, compactionEnabled);
+    static SegmentCompressStorage buildCompressStorage(
+        SegmentArchivedStorage segmentArchivedStorage,
+        boolean compactionEnabled,
+        IgniteLogger log) {
+        SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage, compactionEnabled, log);
 
         segmentArchivedStorage.addObserver(storage::onSegmentArchived);
 
@@ -79,11 +92,22 @@ public class SegmentCompressStorage {
     }
 
     /**
+     * Sets the largest index of previously compressed segment.
+     *
+     * @param idx Segment index.
+     */
+    synchronized void lastSegmentCompressed(long idx) {
+        onSegmentCompressed(lastEnqueuedToCompressIdx = idx);
+    }
+
+    /**
      * Callback after segment compression finish.
      *
      * @param compressedIdx Index of compressed segment.
      */
     synchronized void onSegmentCompressed(long compressedIdx) {
+        log.info("Segment compressed notification [idx=" + compressedIdx + ']');
+
         if (compressedIdx > lastMaxCompressedIdx)
             lastMaxCompressedIdx = compressedIdx;
 
@@ -150,8 +174,11 @@ public class SegmentCompressStorage {
      * Callback for waking up compressor when new segment is archived.
      */
     private synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
-        while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled)
+        while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled) {
+            log.info("Enqueuing segment for compression [idx=" + (lastEnqueuedToCompressIdx + 1) + ']');
+
             segmentsToCompress.add(++lastEnqueuedToCompressIdx);
+        }
 
         notifyAll();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index e3f1499..9e4305a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -22,13 +22,16 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteOrder;
 import java.nio.file.FileVisitResult;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.TreeSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -265,20 +268,24 @@ public class IgniteWalIteratorFactory {
         if (filesOrDirs == null || filesOrDirs.length == 0)
             return Collections.emptyList();
 
-        final FileIOFactory ioFactory = iteratorParametersBuilder.ioFactory;
-
         final TreeSet<FileDescriptor> descriptors = new TreeSet<>();
 
         for (File file : filesOrDirs) {
             if (file.isDirectory()) {
                 try {
                     walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
-                        @Override
-                        public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
-                            addFileDescriptor(path.toFile(), ioFactory, descriptors);
+                        @Override public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
+                            addFileDescriptor(path.toFile(), descriptors, iteratorParametersBuilder);
 
                             return FileVisitResult.CONTINUE;
                         }
+
+                        @Override public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+                            if (exc instanceof NoSuchFileException)
+                                return FileVisitResult.CONTINUE;
+
+                            return super.visitFileFailed(file, exc);
+                        }
                     });
                 }
                 catch (IOException e) {
@@ -288,31 +295,42 @@ public class IgniteWalIteratorFactory {
                 continue;
             }
 
-            addFileDescriptor(file, ioFactory, descriptors);
+            addFileDescriptor(file, descriptors, iteratorParametersBuilder);
         }
 
         return new ArrayList<>(descriptors);
     }
 
     /**
+     * @param file File
+     * @param descriptors List of descriptors
+     * @param params IteratorParametersBuilder.
+     */
+    private void addFileDescriptor(
+        File file,
+        Collection<FileDescriptor> descriptors,
+        IteratorParametersBuilder params)
+    {
+        Optional.ofNullable(getFileDescriptor(file, params.ioFactory))
+            .filter(desc -> desc.idx() >= params.lowBound.index() && desc.idx() <= params.highBound.index())
+            .ifPresent(descriptors::add);
+    }
+
+    /**
      * @param file File.
      * @param ioFactory IO factory.
-     * @param descriptors List of descriptors.
      */
-    private void addFileDescriptor(File file, FileIOFactory ioFactory, TreeSet<FileDescriptor> descriptors) {
+    private FileDescriptor getFileDescriptor(File file, FileIOFactory ioFactory) {
         if (file.length() < HEADER_RECORD_SIZE)
-            return; // Filter out this segment as it is too short.
+            return null; // Filter out this segment as it is too short.
 
         String fileName = file.getName();
 
         if (!WAL_NAME_PATTERN.matcher(fileName).matches() &&
             !WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(fileName).matches())
-            return;  // Filter out this because it is not segment file.
-
-        FileDescriptor desc = readFileDescriptor(file, ioFactory);
+            return null;  // Filter out this because it is not segment file.
 
-        if (desc != null)
-            descriptors.add(desc);
+        return readFileDescriptor(file, ioFactory);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index f14bcac..293b961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
 import org.apache.ignite.internal.stat.IoStatisticsManager;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
@@ -463,6 +464,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
     }
 
     /** {@inheritDoc} */
+    @Override public TransactionalDrProcessor txDr() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public GridLoadBalancerManager loadBalancing() {
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 4044f3a..5bd5a21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -283,9 +283,14 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
         if (tup == null)
             return tup;
 
-        if (!checkBounds(tup.get1())) {
+        if (tup.get2() instanceof FilteredRecord)
+            return new T2<>(tup.get1(), FilteredRecord.INSTANCE);
+
+        WALPointer originalPtr = tup.get2().position();
+
+        if (!checkBounds(originalPtr)) {
             if (curRec != null) {
-                FileWALPointer prevRecPtr = (FileWALPointer)curRec.get1();
+                FileWALPointer prevRecPtr = (FileWALPointer)curRec.get2().position();
 
                 // Fast stop condition, after high bound reached.
                 if (prevRecPtr != null && prevRecPtr.compareTo(highBound) > 0)
@@ -471,7 +476,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
      * @param keepBinary Don't convert non primitive types.
      * @return Unwrapped entry.
      */
-    private DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
+    private @NotNull DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
         KeyCacheObject key, CacheObject val, boolean keepBinary) {
         if (dataEntry instanceof MvccDataEntry)
             return new UnwrapMvccDataEntry(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 5e84253..f8285d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.ConsistentCutRecord;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
@@ -108,6 +109,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
             case MVCC_TX_RECORD:
                 return txRecordSerializer.size((MvccTxRecord)rec);
 
+            case CONSISTENT_CUT:
+                return 0;
+
             default:
                 return super.plainSize(rec);
         }
@@ -192,6 +196,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
             case MVCC_TX_RECORD:
                 return txRecordSerializer.readMvccTx(in);
 
+            case CONSISTENT_CUT:
+                return new ConsistentCutRecord();
+
             default:
                 return super.readPlainRecord(type, in, encrypted);
         }
@@ -274,6 +281,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
 
                 break;
 
+            case CONSISTENT_CUT:
+                break;
+
             default:
                 super.writePlainRecord(rec, buf);
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
index 923949c..6921b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
@@ -69,4 +69,9 @@ public interface RecordSerializerFactory {
      * @param skipPositionCheck Skip position check.
      */
     public RecordSerializerFactory skipPositionCheck(boolean skipPositionCheck);
+
+    /**
+     * Clears factory parameters that are actual only for specific WAL segment.
+     */
+    public RecordSerializerFactory clearSegmentLocalState();
 }
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
index bf9c3cd..4a93d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
@@ -148,4 +148,12 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory {
 
         return this;
     }
+
+    /** {@inheritDoc} */
+    @Override public RecordSerializerFactory clearSegmentLocalState() {
+        skipPositionCheck = false;
+        recordDeserializeFilter = null;
+
+        return this;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ed8923b..9c89cfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -835,6 +835,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
                                     if (grp.persistenceEnabled() && grp.walEnabled() &&
                                         cctx.snapshot().needTxReadLogging()) {
+                                        cctx.tm().pendingTxsTracker().onKeysRead(
+                                            nearXidVersion(), Collections.singletonList(txEntry.key()));
+
                                         ptr = cctx.wal().log(new DataRecord(new DataEntry(
                                             cacheCtx.cacheId(),
                                             txEntry.key(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 12ea9d9..ed8f010 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -232,7 +232,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private TxDeadlockDetection txDeadlockDetection;
 
     /** Flag indicates that {@link TxRecord} records will be logged to WAL. */
-    private boolean logTxRecords;
+    private volatile boolean logTxRecords;
+
+    /** Pending transactions tracker. */
+    private LocalPendingTransactionsTracker pendingTracker;
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
@@ -307,6 +310,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
 
+        this.pendingTracker = new LocalPendingTransactionsTracker(cctx);
+
+        // todo gg-13416 unhardcode
         this.logTxRecords = IgniteSystemProperties.getBoolean(IGNITE_WAL_LOG_TX_RECORDS, false);
     }
 
@@ -2499,6 +2505,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     *
+     */
+    public LocalPendingTransactionsTracker pendingTxsTracker() {
+        return pendingTracker;
+    }
+
+    /**
+     * Enables pending transactions tracker.
+     * Also enables transaction wal logging, if it was disabled.
+     */
+    public void trackPendingTxs() {
+        pendingTracker.enable();
+
+        if (!logTxRecords) {
+            logTxRecords = true;
+
+            U.warn(log, "Transaction wal logging is enabled, because pending transaction tracker is enabled.");
+        }
+    }
+
+    /**
      * Sets MVCC state.
      *
      * @param tx Transaction.
@@ -2555,7 +2582,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             record = new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
 
         try {
-            return cctx.wal().log(record);
+            WALPointer ptr = cctx.wal().log(record);
+
+            TransactionState txState = tx.state();
+
+            if (txState == PREPARED)
+                cctx.tm().pendingTxsTracker().onTxPrepared(tx.nearXidVersion());
+            else if (txState == ROLLED_BACK)
+                cctx.tm().pendingTxsTracker().onTxRolledBack(tx.nearXidVersion());
+            else
+                cctx.tm().pendingTxsTracker().onTxCommitted(tx.nearXidVersion());
+
+            return ptr;
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to log TxRecord: " + record, e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTracker.java
new file mode 100644
index 0000000..3aa179e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTracker.java
@@ -0,0 +1,590 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.GridKernalState;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED;
+
+/**
+ * Tracks pending transactions for purposes of consistent cut algorithm.
+ */
+public class LocalPendingTransactionsTracker {
+    /** Cctx. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Currently committing transactions. */
+    private final Set<GridCacheVersion> currentlyCommittingTxs = U.newConcurrentHashSet();
+
+    /** Tracker enabled. */
+    private volatile boolean enabled = IgniteSystemProperties.getBoolean(IGNITE_PENDING_TX_TRACKER_ENABLED, false);
+
+    /** Currently prepared transactions. Counters are incremented on prepare, decremented on commit/rollback. */
+    private final ConcurrentHashMap<GridCacheVersion, Integer> preparedCommittedTxsCounters = new ConcurrentHashMap<>();
+
+    /**
+     * Transactions that were transitioned to pending state since last {@link #startTrackingPrepared()} call.
+     * Transaction remains in this map after commit/rollback.
+     */
+    private volatile GridConcurrentHashSet<GridCacheVersion> trackedPreparedTxs = new GridConcurrentHashSet<>();
+
+    /** Transactions that were transitioned to committed state since last {@link #startTrackingCommitted()} call. */
+    private volatile GridConcurrentHashSet<GridCacheVersion> trackedCommittedTxs = new GridConcurrentHashSet<>();
+
+    /** Written keys to near xid version. */
+    private volatile ConcurrentHashMap<KeyCacheObject, Set<GridCacheVersion>> writtenKeysToNearXidVer = new ConcurrentHashMap<>();
+
+    /** Graph of dependent (by keys) transactions. */
+    private volatile ConcurrentHashMap<GridCacheVersion, Set<GridCacheVersion>> dependentTransactionsGraph = new ConcurrentHashMap<>();
+    // todo GG-13416: maybe handle local sequential consistency with threadId
+
+    /** State rw-lock. */
+    private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
+
+    /** Track prepared flag. */
+    private final AtomicBoolean trackPrepared = new AtomicBoolean(false);
+
+    /** Track committed flag. */
+    private final AtomicBoolean trackCommitted = new AtomicBoolean(false);
+
+    /** Tx finish awaiting. */
+    private volatile TxFinishAwaiting txFinishAwaiting;
+
+    /**
+     * Tx finish awaiting facility.
+     */
+    private class TxFinishAwaiting {
+        /** Future. */
+        private final GridFutureAdapter<Set<GridCacheVersion>> fut;
+
+        /** Not committed in timeout txs. */
+        private final Set<GridCacheVersion> notCommittedInTimeoutTxs;
+
+        /** Committing txs. */
+        private final Set<GridCacheVersion> committingTxs;
+
+        /** Global committing txs added. */
+        private volatile boolean globalCommittingTxsAdded;
+
+        /** Awaiting prepared is done. */
+        private volatile boolean awaitingPreparedIsDone;
+
+        /** Timeout. */
+        private volatile boolean timeout;
+
+        /**
+         * @param preparedTxsTimeout Prepared txs timeout.
+         * @param committingTxsTimeout Committing txs timeout.
+         */
+        private TxFinishAwaiting(final long preparedTxsTimeout, final long committingTxsTimeout) {
+            assert preparedTxsTimeout > 0 : preparedTxsTimeout;
+            assert committingTxsTimeout > 0 : committingTxsTimeout;
+            assert committingTxsTimeout >= preparedTxsTimeout : committingTxsTimeout + " < " + preparedTxsTimeout;
+
+            fut = new GridFutureAdapter<>();
+
+            notCommittedInTimeoutTxs = new GridConcurrentHashSet<>(preparedCommittedTxsCounters.keySet());
+
+            committingTxs = U.newConcurrentHashSet(currentlyCommittingTxs);
+
+            if (committingTxsTimeout > preparedTxsTimeout) {
+                cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(preparedTxsTimeout) {
+                    @Override public void onTimeout() {
+                        awaitingPreparedIsDone = true;
+
+                        if (TxFinishAwaiting.this != txFinishAwaiting || fut.isDone())
+                            return;
+
+                        stateLock.readLock().lock();
+
+                        try {
+                            if (allCommittingIsFinished())
+                                finish();
+                            else
+                                log.warning("Committing transactions not completed in " + preparedTxsTimeout + " ms: "
+                                    + committingTxs);
+                        }
+                        finally {
+                            stateLock.readLock().unlock();
+                        }
+                    }
+                });
+            }
+
+            cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(committingTxsTimeout) {
+                @Override public void onTimeout() {
+                    timeout = true;
+
+                    if (committingTxsTimeout == preparedTxsTimeout)
+                        awaitingPreparedIsDone = true;
+
+                    if (TxFinishAwaiting.this != txFinishAwaiting || fut.isDone())
+                        return;
+
+                    stateLock.readLock().lock();
+
+                    try {
+                        if (!allCommittingIsFinished())
+                            log.warning("Committing transactions not completed in " + committingTxsTimeout + " ms: "
+                                + committingTxs);
+
+                        finish();
+                    }
+                    finally {
+                        stateLock.readLock().unlock();
+                    }
+                }
+            });
+        }
+
+        /**
+         * @param nearXidVer Near xid version.
+         */
+        void onTxFinished(GridCacheVersion nearXidVer) {
+            notCommittedInTimeoutTxs.remove(nearXidVer);
+
+            checkTxsFinished();
+        }
+
+        /**
+         *
+         */
+        void checkTxsFinished() {
+            if (notCommittedInTimeoutTxs.isEmpty() || awaitingPreparedIsDone && allCommittingIsFinished())
+                finish();
+        }
+
+        /**
+         *
+         */
+        void finish() {
+            if (globalCommittingTxsAdded || timeout) {
+                txFinishAwaiting = null;
+
+                fut.onDone(notCommittedInTimeoutTxs.isEmpty() ?
+                    Collections.emptySet() :
+                    U.sealSet(notCommittedInTimeoutTxs));
+            }
+        }
+
+        /**
+         * @return {@code true} if the set of committing transactions {@code committingTxs} is empty.
+         */
+        boolean allCommittingIsFinished() {
+            committingTxs.retainAll(notCommittedInTimeoutTxs);
+
+            return committingTxs.isEmpty();
+        }
+
+        /**
+         * @param globalCommittingTxs Global committing txs.
+         */
+        void addGlobalCommittingTxs(Set<GridCacheVersion> globalCommittingTxs) {
+            assert stateLock.writeLock().isHeldByCurrentThread();
+
+            notCommittedInTimeoutTxs.addAll(preparedCommittedTxsCounters.keySet());
+
+            Set<GridCacheVersion> pendingTxs = new HashSet<>(notCommittedInTimeoutTxs);
+
+            pendingTxs.retainAll(globalCommittingTxs);
+
+            committingTxs.addAll(pendingTxs);
+
+            globalCommittingTxsAdded = true;
+
+            assert !fut.isDone() || timeout;
+
+            checkTxsFinished();
+        }
+    }
+
+    /**
+     * @param cctx Cctx.
+     */
+    public LocalPendingTransactionsTracker(GridCacheSharedContext<?, ?> cctx) {
+        this.cctx = cctx;
+
+        log = cctx.logger(getClass());
+    }
+
+    /**
+     * Enable pending transactions tracking.
+     */
+    void enable() {
+        assert cctx.kernalContext().gateway().getState() == GridKernalState.STARTING;
+
+        enabled = true;
+    }
+
+    /**
+     * @return whether this tracker is enabled or not.
+     */
+    public boolean enabled() {
+        return enabled;
+    }
+
+    /**
+     * Returns a collection of  transactions {@code P2} that are prepared but yet not committed
+     * between phase {@code Cut1} and phase {@code Cut2}.
+     *
+     * @return Collection of prepared transactions.
+     */
+    public Set<GridCacheVersion> currentlyPreparedTxs() {
+        assert stateLock.writeLock().isHeldByCurrentThread();
+
+        return U.sealSet(preparedCommittedTxsCounters.keySet());
+    }
+
+    /**
+     * Starts tracking transactions that will form a set of transactions {@code P23}
+     * that were prepared since phase {@code Cut2} to phase {@code Cut3}.
+     */
+    public void startTrackingPrepared() {
+        assert stateLock.writeLock().isHeldByCurrentThread();
+        assert !trackPrepared.get(): "Tracking prepared transactions is already initialized.";
+
+        trackPrepared.set(true);
+    }
+
+    /**
+     * @return nearXidVer -> prepared WAL ptr
+     */
+    public Set<GridCacheVersion> stopTrackingPrepared() {
+        assert stateLock.writeLock().isHeldByCurrentThread();
+        assert trackPrepared.get(): "Tracking prepared transactions is not initialized yet.";
+
+        trackPrepared.set(false);
+
+        Set<GridCacheVersion> res = U.sealSet(trackedPreparedTxs);
+
+        trackedPreparedTxs = new GridConcurrentHashSet<>();
+
+        return res;
+    }
+
+    /**
+     * Starts tracking committed transactions {@code C12} between phase {@code Cut1} and phase {@code Cut2}.
+     */
+    public void startTrackingCommitted() {
+        assert stateLock.writeLock().isHeldByCurrentThread();
+        assert !trackCommitted.get() : "Tracking committed transactions is already initialized.";
+
+        trackCommitted.set(true);
+    }
+
+    /**
+     * @return nearXidVer -> prepared WAL ptr
+     */
+    public TrackCommittedResult stopTrackingCommitted() {
+        assert stateLock.writeLock().isHeldByCurrentThread();
+        assert trackCommitted.get() : "Tracking committed transactions is not initialized yet.";
+
+        trackCommitted.set(false);
+
+        Set<GridCacheVersion> committedTxs = U.sealSet(trackedCommittedTxs);
+
+        Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxs = U.sealMap(dependentTransactionsGraph);
+
+        trackedCommittedTxs = new GridConcurrentHashSet<>();
+
+        writtenKeysToNearXidVer = new ConcurrentHashMap<>();
+
+        dependentTransactionsGraph = new ConcurrentHashMap<>();
+
+        return new TrackCommittedResult(committedTxs, dependentTxs);
+    }
+
+    /**
+     * @param preparedTxsTimeout Timeout in milliseconds for awaiting of prepared transactions.
+     * @param committingTxsTimeout Timeout in milliseconds for awaiting of committing transactions.
+     * @return Collection of local transactions in committing state.
+     */
+    public Set<GridCacheVersion> startTxFinishAwaiting(
+        long preparedTxsTimeout, long committingTxsTimeout) {
+
+        assert stateLock.writeLock().isHeldByCurrentThread();
+
+        assert txFinishAwaiting == null : txFinishAwaiting;
+
+        TxFinishAwaiting awaiting = new TxFinishAwaiting(preparedTxsTimeout, committingTxsTimeout);
+
+        txFinishAwaiting = awaiting;
+
+        return awaiting.committingTxs;
+    }
+
+    /**
+     * @param globalCommittingTxs Global committing transactions.
+     * @return Future with collection of transactions that failed to finish within timeout.
+     */
+    public IgniteInternalFuture<Set<GridCacheVersion>> awaitPendingTxsFinished(
+        Set<GridCacheVersion> globalCommittingTxs
+    ) {
+        assert stateLock.writeLock().isHeldByCurrentThread();
+
+        TxFinishAwaiting awaiting = txFinishAwaiting;
+
+        assert awaiting != null;
+
+        awaiting.addGlobalCommittingTxs(globalCommittingTxs);
+
+        return awaiting.fut;
+    }
+
+    /**
+     * Freezes state of all tracker collections. Any active transactions that modify collections will
+     * wait on readLock().
+     * Can be used to obtain consistent snapshot of several collections.
+     */
+    public void writeLockState() {
+        stateLock.writeLock().lock();
+    }
+
+    /**
+     * Unfreezes state of all tracker collections, releases waiting transactions.
+     */
+    public void writeUnlockState() {
+        stateLock.writeLock().unlock();
+    }
+
+    /**
+     * @param nearXidVer Near xid version.
+     */
+    public void onTxPrepared(GridCacheVersion nearXidVer) {
+        if (!enabled)
+            return;
+
+        stateLock.readLock().lock();
+
+        try {
+            preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> value == null ? 1 : value + 1);
+
+            if (trackPrepared.get())
+                trackedPreparedTxs.add(nearXidVer);
+        }
+        finally {
+            stateLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param nearXidVer Near xid version.
+     */
+    public void onTxCommitted(GridCacheVersion nearXidVer) {
+        if (!enabled)
+            return;
+
+        stateLock.readLock().lock();
+
+        try {
+            Integer newCntr = preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> {
+                if (value == null || value <= 0) {
+                    throw new AssertionError("Committing transaction that was rolled back or concurrently committed " +
+                        "[nearXidVer=" + nearXidVer + ", currentCntr=" + value + ']');
+                }
+
+                if (value == 1)
+                    return null;
+
+                return value - 1;
+            });
+
+            if (newCntr == null) {
+                currentlyCommittingTxs.remove(nearXidVer);
+
+                if (trackCommitted.get())
+                    trackedCommittedTxs.add(nearXidVer);
+
+                checkTxFinishFutureDone(nearXidVer);
+            }
+        }
+        finally {
+            stateLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param nearXidVer Near xid version.
+     */
+    public void onTxRolledBack(GridCacheVersion nearXidVer) {
+        if (!enabled)
+            return;
+
+        stateLock.readLock().lock();
+
+        try {
+            Integer newCntr = preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> {
+                if (value == null || value <= 1)
+                    return null;
+
+                return value - 1;
+            });
+
+            if (newCntr == null) {
+                currentlyCommittingTxs.remove(nearXidVer);
+
+                checkTxFinishFutureDone(nearXidVer);
+            }
+        }
+        finally {
+            stateLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param nearXidVer Near xid version.
+     * @param keys Keys.
+     */
+    public void onKeysWritten(GridCacheVersion nearXidVer, List<KeyCacheObject> keys) {
+        if (!enabled)
+            return;
+
+        stateLock.readLock().lock();
+
+        try {
+            if (!preparedCommittedTxsCounters.containsKey(nearXidVer))
+                throw new AssertionError("Tx should be in PREPARED state when logging data records: " + nearXidVer);
+
+            currentlyCommittingTxs.add(nearXidVer);
+
+            if (!trackCommitted.get())
+                return;
+
+            for (KeyCacheObject key : keys) {
+                writtenKeysToNearXidVer.compute(key, (keyObj, keyTxsSet) -> {
+                    Set<GridCacheVersion> keyTxs = keyTxsSet == null ? new HashSet<>() : keyTxsSet;
+
+                    for (GridCacheVersion previousTx : keyTxs) {
+                        dependentTransactionsGraph.compute(previousTx, (tx, depTxsSet) -> {
+                            Set<GridCacheVersion> dependentTxs = depTxsSet == null ? new HashSet<>() : depTxsSet;
+
+                            dependentTxs.add(nearXidVer);
+
+                            return dependentTxs;
+                        });
+                    }
+
+                    keyTxs.add(nearXidVer);
+
+                    return keyTxs;
+                });
+            }
+        }
+        finally {
+            stateLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param nearXidVer Near xid version.
+     * @param keys Keys.
+     */
+    public void onKeysRead(GridCacheVersion nearXidVer, List<KeyCacheObject> keys) {
+        if (!enabled)
+            return;
+
+        stateLock.readLock().lock();
+
+        try {
+            if (!preparedCommittedTxsCounters.containsKey(nearXidVer))
+                throw new AssertionError("Tx should be in PREPARED state when logging data records: " + nearXidVer);
+
+            currentlyCommittingTxs.add(nearXidVer);
+
+            if (!trackCommitted.get())
+                return;
+
+            for (KeyCacheObject key : keys) {
+                writtenKeysToNearXidVer.computeIfPresent(key, (keyObj, keyTxsSet) -> {
+                    for (GridCacheVersion previousTx : keyTxsSet) {
+                        dependentTransactionsGraph.compute(previousTx, (tx, depTxsSet) -> {
+                            Set<GridCacheVersion> dependentTxs = depTxsSet == null ? new HashSet<>() : depTxsSet;
+
+                            dependentTxs.add(nearXidVer);
+
+                            return dependentTxs;
+                        });
+                    }
+
+                    return keyTxsSet;
+                });
+            }
+        }
+        finally {
+            stateLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Resets the state of this tracker.
+     */
+    public void reset() {
+        stateLock.writeLock().lock();
+
+        try {
+            txFinishAwaiting = null;
+
+            trackCommitted.set(false);
+
+            trackedCommittedTxs = new GridConcurrentHashSet<>();
+
+            trackPrepared.set(false);
+
+            trackedPreparedTxs = new GridConcurrentHashSet<>();
+
+            writtenKeysToNearXidVer = new ConcurrentHashMap<>();
+
+            dependentTransactionsGraph = new ConcurrentHashMap<>();
+        }
+        finally {
+            stateLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param nearXidVer Near xid version.
+     */
+    private void checkTxFinishFutureDone(GridCacheVersion nearXidVer) {
+        if (!enabled)
+            return;
+
+        TxFinishAwaiting awaiting = txFinishAwaiting;
+
+        if (awaiting != null)
+            awaiting.onTxFinished(nearXidVer);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TrackCommittedResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TrackCommittedResult.java
new file mode 100644
index 0000000..2ac5097
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TrackCommittedResult.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Tuple for result of {@link LocalPendingTransactionsTracker#stopTrackingCommitted()}.
+ */
+public class TrackCommittedResult {
+    /** Transactions committed during tracked period. */
+    private final Set<GridCacheVersion> committedTxs;
+
+    /** Graph of dependent (by keys) transactions. */
+    private final Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxsGraph;
+
+    /**
+     * @param committedTxs Commited txs.
+     * @param dependentTxsGraph Dependent txs graph.
+     */
+    public TrackCommittedResult(
+        Set<GridCacheVersion> committedTxs,
+        Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxsGraph
+    ) {
+        this.committedTxs = committedTxs;
+        this.dependentTxsGraph = dependentTxsGraph;
+    }
+
+    /**
+     *
+     */
+    public Set<GridCacheVersion> committedTxs() {
+        return committedTxs;
+    }
+
+    /**
+     *
+     */
+    public Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxsGraph() {
+        return dependentTxsGraph;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 40b20ee..03e6462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 
@@ -61,8 +62,12 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
     /** Data center ID. */
     private byte dataCenterId;
 
-    /** */
-    private long gridStartTime;
+    /**
+     * Oldest cluster node start timestamp, lazily initialized.
+     *
+     * @see DiscoverySpi#getGridStartTime().
+     */
+    private volatile long gridStartTime;
 
     /** */
     private GridCacheVersion ISOLATED_STREAMER_VER;
@@ -324,4 +329,11 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
     public boolean isStartVersion(GridCacheVersion ver) {
         return startVer.equals(ver);
     }
+
+    /**
+     * Invalidates first cluster node start timestamp, it can be reinitialized lazily in the future.
+     */
+    public void invalidateGridStartTime() {
+        gridStartTime = 0;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java
index 7f2b4b0..6a02e42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java
@@ -19,6 +19,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -75,4 +76,23 @@ public class BaselineTopologyHistoryItem implements Serializable {
     public List<Long> branchingHistory() {
         return branchingHistory;
     }
+
+    /**
+     * Returns {@code true} if baseline topology history item contains node with given consistent ID.
+     *
+     * @param consistentId Consistent ID.
+     * @return {@code True} if baseline topology history item contains node with given consistent ID.
+     */
+    public boolean containsNode(Object consistentId) {
+        return consIds.contains(consistentId);
+    }
+
+    /**
+     * Returns a copy of consistent ids of nodes that included into this baseline topology item.
+     *
+     * @return Collection of consistent ids.
+     */
+    public Collection<Object> consistentIds() {
+        return U.arrayList(this.consIds);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index d19585d..d41ff14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -197,6 +197,14 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
     }
 
     /**
+     * Sets timestamp.
+     * @param timestamp Timestamp.
+     */
+    public void timestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    /**
      * @return State change request ID.
      */
     public UUID requestId() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index c958e17..6357791 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -1,12 +1,12 @@
 /*
  * Copyright 2019 GridGain Systems, Inc. and Contributors.
- * 
+ *
  * Licensed under the GridGain Community Edition License (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
@@ -335,6 +336,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     }
 
     /**
+     * Returns baseline topology history.
+     */
+    public BaselineTopologyHistory baselineHistory() {
+        return bltHist;
+    }
+
+    /**
      * @param blt Blt.
      */
     private void writeBaselineTopology(BaselineTopology blt, BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException {
@@ -443,20 +451,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     /**
      * Checks whether all conditions to meet BaselineTopology are satisfied.
      */
-    private boolean isBaselineSatisfied(BaselineTopology blt, List<ClusterNode> serverNodes) {
+    private boolean isBaselineSatisfied(BaselineTopology blt, List<ClusterNode> srvNodes) {
         if (blt == null)
             return false;
 
         if (blt.consistentIds() == null)
             return false;
 
-        if (//only node participating in BaselineTopology is allowed to send activation command...
-            blt.consistentIds().contains(ctx.discovery().localNode().consistentId())
-                //...and with this node BaselineTopology is reached
-                && blt.isSatisfied(serverNodes))
-            return true;
-
-        return false;
+        // Only node participating in BaselineTopology is allowed to send activation command
+        // and with this node BaselineTopology is reached.
+        return blt.consistentIds().contains(ctx.discovery().localNode().consistentId()) && blt.isSatisfied(srvNodes);
     }
 
     /** {@inheritDoc} */
@@ -523,10 +527,11 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     ) {
         DiscoveryDataClusterState state = globalState;
 
-        if (log.isInfoEnabled())
-            U.log(log, "Received " + prettyStr(msg.activate()) + " request with BaselineTopology" +
-                (msg.baselineTopology() == null ? ": null"
-                    : "[id=" + msg.baselineTopology().id() + "]"));
+        if (log.isInfoEnabled()) {
+            String baseline = msg.baselineTopology() == null ? ": null" : "[id=" + msg.baselineTopology().id() + ']';
+
+            U.log(log, "Received " + prettyStr(msg.activate()) + " request with BaselineTopology" + baseline);
+        }
 
         if (msg.baselineTopology() != null)
             compatibilityMode = false;
@@ -592,7 +597,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                 BaselineTopologyHistoryItem bltHistItem = BaselineTopologyHistoryItem.fromBaseline(
                     globalState.baselineTopology());
 
-                transitionFuts.put(msg.requestId(), new GridFutureAdapter<Void>());
+                transitionFuts.put(msg.requestId(), new GridFutureAdapter<>());
 
                 DiscoveryDataClusterState prevState = globalState;
 
@@ -679,7 +684,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         if (initiatorNode.equals(ctx.localNodeId())) {
             GridChangeGlobalStateFuture fut = stateChangeFut.get();
 
-            if (fut != null && fut.requestId.equals(reqId))
+            if (fut != null && fut.reqId.equals(reqId))
                 return fut;
         }
 
@@ -774,19 +779,19 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             return;
         }
 
-        BaselineTopologyHistory historyToSend = null;
+        BaselineTopologyHistory histToSnd = null;
 
         if (!bltHist.isEmpty()) {
             if (joiningNodeState != null && joiningNodeState.baselineTopology() != null) {
                 int lastId = joiningNodeState.baselineTopology().id();
 
-                historyToSend = bltHist.tailFrom(lastId);
+                histToSnd = bltHist.tailFrom(lastId);
             }
             else
-                historyToSend = bltHist;
+                histToSnd = bltHist;
         }
 
-        dataBag.addGridCommonData(STATE_PROC.ordinal(), new BaselineStateAndHistoryData(globalState, historyToSend));
+        dataBag.addGridCommonData(STATE_PROC.ordinal(), new BaselineStateAndHistoryData(globalState, histToSnd));
     }
 
     /** {@inheritDoc} */
@@ -811,7 +816,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             DiscoveryDataClusterState state = stateDiscoData.globalState;
 
             if (state.transition())
-                transitionFuts.put(state.transitionRequestId(), new GridFutureAdapter<Void>());
+                transitionFuts.put(state.transitionRequestId(), new GridFutureAdapter<>());
 
             globalState = state;
 
@@ -826,9 +831,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     @Override public IgniteInternalFuture<?> changeGlobalState(
         final boolean activate,
         Collection<? extends BaselineNode> baselineNodes,
-        boolean forceChangeBaselineTopology
+        boolean forceChangeBaselineTop
     ) {
-        return changeGlobalState(activate, baselineNodes, forceChangeBaselineTopology, false);
+        return changeGlobalState(activate, baselineNodes, forceChangeBaselineTop, false);
     }
 
     public IgniteInternalFuture<?> changeGlobalState(
@@ -848,15 +853,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
      */
     private BaselineTopology calculateNewBaselineTopology(final boolean activate,
         Collection<? extends BaselineNode> baselineNodes,
-        boolean forceChangeBaselineTopology) {
+        boolean forceChangeBaselineTop
+    ) {
         BaselineTopology newBlt;
 
-        BaselineTopology currentBlt = globalState.baselineTopology();
+        BaselineTopology currBlt = globalState.baselineTopology();
 
         int newBltId = 0;
 
-        if (currentBlt != null)
-            newBltId = activate ? currentBlt.id() + 1 : currentBlt.id();
+        if (currBlt != null)
+            newBltId = activate ? currBlt.id() + 1 : currBlt.id();
 
         if (baselineNodes != null && !baselineNodes.isEmpty()) {
             List<BaselineNode> baselineNodes0 = new ArrayList<>();
@@ -875,16 +881,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             baselineNodes = baselineNodes0;
         }
 
-        if (forceChangeBaselineTopology)
+        if (forceChangeBaselineTop)
             newBlt = BaselineTopology.build(baselineNodes, newBltId);
         else if (activate) {
             if (baselineNodes == null)
                 baselineNodes = baselineNodes();
 
-            if (currentBlt == null)
+            if (currBlt == null)
                 newBlt = BaselineTopology.build(baselineNodes, newBltId);
             else {
-                newBlt = currentBlt;
+                newBlt = currBlt;
 
                 newBlt.updateHistory(baselineNodes);
             }
@@ -899,32 +905,38 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     private Collection<BaselineNode> baselineNodes() {
         List<ClusterNode> clNodes = ctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
 
-        ArrayList<BaselineNode> bltNodes = new ArrayList<>(clNodes.size());
+        List<BaselineNode> bltNodes = new ArrayList<>(clNodes.size());
 
-        for (ClusterNode clNode : clNodes)
-            bltNodes.add(clNode);
+        bltNodes.addAll(clNodes);
 
         return bltNodes;
     }
 
     /** */
-    private IgniteInternalFuture<?> changeGlobalState0(final boolean activate,
-        BaselineTopology blt, boolean forceChangeBaselineTopology) {
-        return changeGlobalState0(activate, blt, forceChangeBaselineTopology, false);
+    private IgniteInternalFuture<?> changeGlobalState0(
+        final boolean activate,
+        BaselineTopology blt,
+        boolean forceChangeBaselineTop
+    ) {
+        return changeGlobalState0(activate, blt, forceChangeBaselineTop, false);
     }
 
     /** */
-    private IgniteInternalFuture<?> changeGlobalState0(final boolean activate,
-        BaselineTopology blt, boolean forceChangeBaselineTopology, boolean isAutoAdjust) {
+    private IgniteInternalFuture<?> changeGlobalState0(
+        final boolean activate,
+        BaselineTopology blt,
+        boolean forceChangeBaselineTop,
+        boolean isAutoAdjust
+    ) {
         boolean isBaselineAutoAdjustEnabled = isBaselineAutoAdjustEnabled();
 
-        if (forceChangeBaselineTopology && isBaselineAutoAdjustEnabled != isAutoAdjust)
+        if (forceChangeBaselineTop && isBaselineAutoAdjustEnabled != isAutoAdjust)
             throw new BaselineAdjustForbiddenException(isBaselineAutoAdjustEnabled);
 
         if (ctx.isDaemon() || ctx.clientNode()) {
             GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
 
-            sendComputeChangeGlobalState(activate, blt, forceChangeBaselineTopology, fut);
+            sendComputeChangeGlobalState(activate, blt, forceChangeBaselineTop, fut);
 
             return fut;
         }
@@ -995,15 +1007,17 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             }
         }
 
-        ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId,
+        ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.reqId,
             ctx.localNodeId(),
             storedCfgs,
             activate,
             blt,
-            forceChangeBaselineTopology,
+            forceChangeBaselineTop,
             System.currentTimeMillis()
         );
 
+        ctx.txDr().onChangeGlobalStateMessagePrepared(msg);
+
         IgniteInternalFuture<?> resFut = wrapStateChangeFuture(startedFut, msg);
 
         try {
@@ -1059,16 +1073,17 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             return null;
 
         if (globalState == null || globalState.baselineTopology() == null) {
-            if (joiningNodeState != null && joiningNodeState.baselineTopology() != null) {
-                String msg = "Node with set up BaselineTopology is not allowed to join cluster without one: " + node.consistentId();
+            if (joiningNodeState.baselineTopology() != null) {
+                String msg = "Node with set up BaselineTopology is not allowed to join cluster without one: " +
+                        node.consistentId();
 
                 return new IgniteNodeValidationResult(node.id(), msg, msg);
             }
         }
 
         if (globalState.transition() && globalState.previousBaselineTopology() == null) {
-            //case when cluster is activating for the first time and other node with existing baseline topology
-            //tries to join
+            // Case when cluster is activating for the first time and other node with existing baseline topology
+            // tries to join.
 
             String msg = "Node with set up BaselineTopology is not allowed " +
                 "to join cluster in the process of first activation: " + node.consistentId();
@@ -1193,12 +1208,12 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                 ", daemon" + ctx.isDaemon() + "]");
         }
 
-        ClusterGroupAdapter clusterGroupAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers();
+        ClusterGroupAdapter clusterGrpAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers();
 
-        if (F.isEmpty(clusterGroupAdapter.nodes()))
+        if (F.isEmpty(clusterGrpAdapter.nodes()))
             return new IgniteFinishedFutureImpl<>(false);
 
-        IgniteCompute comp = clusterGroupAdapter.compute();
+        IgniteCompute comp = clusterGrpAdapter.compute();
 
         return comp.callAsync(new IgniteCallable<Boolean>() {
             @IgniteInstanceResource
@@ -1283,6 +1298,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
                     distributedBaselineConfiguration.onActivate();
 
+                    ctx.txDr().onActivate(ctx);
+
                     if (log.isInfoEnabled())
                         log.info("Successfully performed final activation steps [nodeId="
                             + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
@@ -1333,6 +1350,92 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     }
 
     /**
+     * Executes validation checks of cluster state and BaselineTopology before changing BaselineTopology to new one.
+     */
+    public void validateBeforeBaselineChange(Collection<? extends BaselineNode> baselineTop) {
+        verifyBaselineTopologySupport(ctx.discovery().discoCache());
+
+        if (!ctx.state().clusterState().active())
+            throw new IgniteException("Changing BaselineTopology on inactive cluster is not allowed.");
+
+        if (baselineTop != null) {
+            if (baselineTop.isEmpty())
+                throw new IgniteException("BaselineTopology must contain at least one node.");
+
+            Collection<Object> onlineNodes = onlineBaselineNodesRequestedForRemoval(baselineTop);
+
+            if (onlineNodes != null) {
+                if (!onlineNodes.isEmpty()) {
+                    throw new IgniteException("Removing online nodes from BaselineTopology is not supported: " +
+                            onlineNodes);
+                }
+            }
+        }
+    }
+
+    /**
+     * Verifies all nodes in current cluster topology support BaselineTopology feature
+     * so compatibilityMode flag is enabled to reset.
+     *
+     * @param discoCache Discovery cache.
+     */
+    private void verifyBaselineTopologySupport(DiscoCache discoCache) {
+        if (discoCache.minimumServerNodeVersion().compareTo(MIN_BLT_SUPPORTING_VER) < 0) {
+            SB sb = new SB("Cluster contains nodes that don't support BaselineTopology: [");
+
+            for (ClusterNode cn : discoCache.serverNodes()) {
+                if (cn.version().compareTo(MIN_BLT_SUPPORTING_VER) < 0)
+                    sb
+                            .a('[')
+                            .a(cn.consistentId())
+                            .a(':')
+                            .a(cn.version())
+                            .a("], ");
+            }
+
+            sb.d(sb.length() - 2, sb.length());
+
+            throw new IgniteException(sb.a(']').toString());
+        }
+    }
+
+    /** */
+    @Nullable private Collection<Object> onlineBaselineNodesRequestedForRemoval(Collection<? extends BaselineNode> newBlt) {
+        BaselineTopology blt = ctx.state().clusterState().baselineTopology();
+        Set<Object> bltConsIds;
+
+        if (blt == null)
+            return null;
+        else
+            bltConsIds = blt.consistentIds();
+
+        ArrayList<Object> onlineNodesRequestedForRemoval = new ArrayList<>();
+
+        Collection<Object> aliveNodesConsIds = getConsistentIds(ctx.discovery().aliveServerNodes());
+
+        Collection<Object> newBltConsIds = getConsistentIds(newBlt);
+
+        for (Object oldBltConsId : bltConsIds) {
+            if (aliveNodesConsIds.contains(oldBltConsId)) {
+                if (!newBltConsIds.contains(oldBltConsId))
+                    onlineNodesRequestedForRemoval.add(oldBltConsId);
+            }
+        }
+
+        return onlineNodesRequestedForRemoval;
+    }
+
+    /** */
+    private Collection<Object> getConsistentIds(Collection<? extends BaselineNode> nodes) {
+        ArrayList<Object> res = new ArrayList<>(nodes.size());
+
+        for (BaselineNode n : nodes)
+            res.add(n.consistentId());
+
+        return res;
+    }
+
+    /**
      * @param reqId Request ID.
      * @param initNodeId Initialize node id.
      * @param ex Exception.
@@ -1374,14 +1477,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
         if (log.isDebugEnabled()) {
             log.debug("Received activation response [requestId=" + msg.getRequestId() +
-                ", nodeId=" + nodeId + "]");
+                ", nodeId=" + nodeId + ']');
         }
 
-        UUID requestId = msg.getRequestId();
+        UUID reqId = msg.getRequestId();
 
         final GridChangeGlobalStateFuture fut = stateChangeFut.get();
 
-        if (fut != null && requestId.equals(fut.requestId)) {
+        if (fut != null && reqId.equals(fut.reqId)) {
             if (fut.initFut.isDone())
                 fut.onResponse(nodeId, msg);
             else {
@@ -1403,11 +1506,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     private void onStateRestored(BaselineTopology blt) {
         DiscoveryDataClusterState state = globalState;
 
-        if (!state.active() && !state.transition() && state.baselineTopology() == null) {
-            DiscoveryDataClusterState newState = DiscoveryDataClusterState.createState(false, blt);
-
-            globalState = newState;
-        }
+        if (!state.active() && !state.transition() && state.baselineTopology() == null)
+            globalState = DiscoveryDataClusterState.createState(false, blt);
     }
 
     /**
@@ -1518,7 +1618,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchangeFuture, boolean hasMovingPartitions) {
+    @Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchFut, boolean hasMovingPartitions) {
         // no-op
     }
 
@@ -1627,7 +1727,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     private class GridChangeGlobalStateFuture extends GridFutureAdapter<Void> {
         /** Request id. */
         @GridToStringInclude
-        private final UUID requestId;
+        private final UUID reqId;
 
         /** Activate. */
         private final boolean activate;
@@ -1657,12 +1757,12 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         private final IgniteLogger log;
 
         /**
-         * @param requestId State change request ID.
+         * @param reqId State change request ID.
          * @param activate New cluster state.
          * @param ctx Context.
          */
-        GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
-            this.requestId = requestId;
+        GridChangeGlobalStateFuture(UUID reqId, boolean activate, GridKernalContext ctx) {
+            this.reqId = reqId;
             this.activate = activate;
             this.ctx = ctx;
 
@@ -1670,10 +1770,10 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         }
 
         /**
-         * @param event Event.
+         * @param evt Event.
          */
-        void onNodeLeft(DiscoveryEvent event) {
-            assert event != null;
+        void onNodeLeft(DiscoveryEvent evt) {
+            assert evt != null;
 
             if (isDone())
                 return;
@@ -1681,7 +1781,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             boolean allReceived = false;
 
             synchronized (mux) {
-                if (remaining.remove(event.eventNode().id()))
+                if (remaining.remove(evt.eventNode().id()))
                     allReceived = remaining.isEmpty();
             }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/NoOpTransactionalDrProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/NoOpTransactionalDrProcessor.java
new file mode 100644
index 0000000..ff82b19
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/NoOpTransactionalDrProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.txdr;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class NoOpTransactionalDrProcessor extends GridProcessorAdapter implements TransactionalDrProcessor {
+    /**
+     * @param ctx Context.
+     */
+    public NoOpTransactionalDrProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckPointBegin(long snapshotId, WALPointer ptr, SnapshotOperation snapshotOperation) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeActivate(GridKernalContext kctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onPartitionsFullMessagePrepared(
+            @Nullable GridDhtPartitionExchangeId exchId,
+            GridDhtPartitionsFullMessage fullMsg
+    ) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onChangeGlobalStateMessagePrepared(ChangeGlobalStateMessage msg) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean shouldIgnoreAssignPartitionStates(GridDhtPartitionsExchangeFuture fut) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean shouldScheduleRebalance(GridDhtPartitionsExchangeFuture fut) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean shouldApplyUpdateCounterOnRebalance() {
+        return false;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/TransactionalDrProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/TransactionalDrProcessor.java
new file mode 100644
index 0000000..8b679a4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/TransactionalDrProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.txdr;
+
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
+import org.apache.ignite.internal.processors.GridProcessor;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface TransactionalDrProcessor extends GridProcessor, IgniteChangeGlobalStateSupport {
+    /**
+     * @param snapshotId Snapshot id.
+     * @param ptr Pointer to the {@link SnapshotRecord}.
+     * @param snapshotOperation Snapshot operation.
+     */
+    public void onMarkCheckPointBegin(long snapshotId, WALPointer ptr, SnapshotOperation snapshotOperation);
+
+    /**
+     * @param exchId Partition exchange id.
+     *               This parameter has always a non-null value in case of the message is created for an exchange.
+     * @param fullMsg Partitions full message.
+     */
+    public void onPartitionsFullMessagePrepared(@Nullable GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionsFullMessage fullMsg);
+
+    /**
+     * @param msg Change global state message.
+     */
+    public void onChangeGlobalStateMessagePrepared(ChangeGlobalStateMessage msg);
+
+    /**
+     * Returns true if we should skip assigning MOVING state to partitions due to outdated counters.
+     * The idea is to avoid redundant rebalance in case of random discovery events on REPLICA cluster.
+     * If event is "special" and rebalance really should happen according to REPLICA lifecycle, method will return true.
+     *
+     * @param fut Current exchange future.
+     */
+    public boolean shouldIgnoreAssignPartitionStates(GridDhtPartitionsExchangeFuture fut);
+
+    /**
+     * Returns true if we should schedule rebalance for MOVING partitions even if ideal assignment wasn't changed.
+     *
+     * @param fut Current exchange future.
+     */
+    public boolean shouldScheduleRebalance(GridDhtPartitionsExchangeFuture fut);
+
+    /**
+     * Returns {@code true} if update counters that are placed into {@link GridDhtPartitionSupplyMessage#last()}
+     * should be applied when the rebalance is finished.
+     *
+     * @return {@code true} if update counters should be applied when the rebalance is finished.
+     */
+    public boolean shouldApplyUpdateCounterOnRebalance();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java
index 6a271f9..fade773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java
@@ -18,6 +18,8 @@ package org.apache.ignite.internal.util;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
@@ -26,12 +28,13 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * This class implements a circular buffer for efficient data exchange.
  */
-public class GridCircularBuffer<T> {
+public class GridCircularBuffer<T> implements Iterable<T> {
     /** */
     private final long sizeMask;
 
@@ -129,6 +132,50 @@ public class GridCircularBuffer<T> {
         return arr[idx0].update(idx, t, arr.length, c);
     }
 
+    /**
+     * Returns read-only iterator over the elements.
+     * <p>
+     * The iterator can be used concurrently with adding new elements to the buffer,
+     * but the data read through iteration may be inconsistent then.
+     *
+     * @return Iterator.
+     */
+    @NotNull @Override public Iterator<T> iterator() {
+        return new Iterator<T>() {
+            int i;
+            int rest;
+
+            {
+                int tail = (int)sizeMask;
+
+                while (tail >= 0 && arr[tail].item == null)
+                    tail--;
+
+                rest = tail + 1;
+
+                i = rest <= sizeMask ? 0 : idxGen.intValue() & (int)sizeMask;
+            };
+
+            @Override public boolean hasNext() {
+                return rest > 0;
+            }
+
+            @Override public T next() {
+                if (rest <= 0)
+                    throw new NoSuchElementException();
+
+                T res = arr[i++].item;
+
+                if (i > sizeMask)
+                    i = 0;
+
+                rest--;
+
+                return res;
+            }
+        };
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCircularBuffer.class, this);
@@ -202,16 +249,19 @@ public class GridCircularBuffer<T> {
 
             idx = newIdx; // Index should be updated even if closure fails.
 
-            if (c != null && item != null)
-                c.applyx(item);
+            try {
+                if (c != null && item != null)
+                    c.applyx(item);
 
-            V old = item;
+                V old = item;
 
-            item = newItem;
+                item = newItem;
 
-            notifyAll();
-
-            return old;
+                return old;
+            }
+            finally {
+                notifyAll();
+            }
         }
 
         /**
@@ -226,4 +276,4 @@ public class GridCircularBuffer<T> {
             return S.toString(Item.class, this, "hash=" + System.identityHashCode(this));
         }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 84d6510..81a90bf 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1928,6 +1928,17 @@ public abstract class IgniteUtils {
      * @param <E> Entry type
      * @return Sealed collection.
      */
+    public static <E> Set<E> sealSet(Collection<E> c) {
+        return Collections.unmodifiableSet(new HashSet<>(c));
+    }
+
+    /**
+     * Seal collection.
+     *
+     * @param c Collection to seal.
+     * @param <E> Entry type
+     * @return Sealed collection.
+     */
     public static <E> List<E> sealList(Collection<E> c) {
         return Collections.unmodifiableList(new ArrayList<>(c));
     }
@@ -9503,6 +9514,32 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Creates new {@link Set} based on {@link ConcurrentHashMap}.
+     *
+     * @param <T> Type of elements.
+     * @return New concurrent set.
+     */
+    public static <T> Set<T> newConcurrentHashSet() {
+        return Collections.newSetFromMap(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Constructs a new {@link Set} based on {@link ConcurrentHashMap},
+     * containing the elements in the specified collection.
+     *
+     * @param <T> Type of elements.
+     * @param c Source collection.
+     * @return New concurrent set.
+     */
+    public static <T> Set<T> newConcurrentHashSet(Collection<T> c) {
+        Set<T> set = newConcurrentHashSet();
+
+        set.addAll(c);
+
+        return set;
+    }
+
+    /**
      * Creates new map that limited by size.
      *
      * @param limit Limit for size.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index a57eb95..694636f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -290,6 +290,16 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /**
+     * @param idx Stripe index.
+     * @return Queue size of specific stripe.
+     */
+    public int queueSize(int idx) {
+        A.ensure(idx >= 0, "Stripe index should be non-negative: " + idx);
+
+        return stripes[idx % stripes.length].queueSize();
+    }
+
+    /**
      * @return Completed tasks count.
      */
     public long completedTasks() {
diff --git a/modules/core/src/main/java/org/apache/ignite/logger/EchoingLogger.java b/modules/core/src/main/java/org/apache/ignite/logger/EchoingLogger.java
new file mode 100644
index 0000000..ebc8241
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/logger/EchoingLogger.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.logger;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link IgniteLogger} wrapper that echoes log messages to arbitrary target.
+ */
+public class EchoingLogger implements IgniteLogger {
+    /** */
+    private final IgniteLogger delegate;
+
+    /** */
+    private final Consumer<String> echoTo;
+
+    /**
+     * @param echoTo Echo to.
+     * @param delegate Delegate.
+     */
+    public EchoingLogger(@NotNull IgniteLogger delegate, @NotNull Consumer<String> echoTo) {
+        this.delegate = Objects.requireNonNull(delegate);
+        this.echoTo = Objects.requireNonNull(echoTo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger getLogger(Object ctgr) {
+        return new EchoingLogger(delegate.getLogger(ctgr), echoTo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void trace(String msg) {
+        if (delegate.isTraceEnabled()) {
+            delegate.trace(msg);
+
+            echoTo.accept(String.format("[%-23s][%-5s] %s", now(), "TRACE", msg));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void debug(String msg) {
+        if (delegate.isDebugEnabled()) {
+            delegate.debug(msg);
+
+            echoTo.accept(String.format("[%-23s][%-5s] %s", now(), "DEBUG", msg));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void info(String msg) {
+        if (delegate.isInfoEnabled()) {
+            delegate.info(msg);
+
+            echoTo.accept(String.format("[%-23s][%-5s] %s", now(), "INFO", msg));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void warning(String msg, @Nullable Throwable e) {
+        delegate.warning(msg, e);
+
+        echoTo.accept(String.format("[%-23s][%-5s] %s%s", now(), "WARN", msg, formatThrowable(Optional.ofNullable(e))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void error(String msg, @Nullable Throwable e) {
+        delegate.error(msg, e);
+
+        echoTo.accept(String.format("[%-23s][%-5s] %s%s", now(), "ERROR", msg, formatThrowable(Optional.ofNullable(e))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTraceEnabled() {
+        return delegate.isTraceEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDebugEnabled() {
+        return delegate.isDebugEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isInfoEnabled() {
+        return delegate.isInfoEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isQuiet() {
+        return delegate.isQuiet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String fileName() {
+        return delegate.fileName();
+    }
+
+    /**
+     *
+     */
+    private static String now() {
+        return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+    }
+
+    /**
+     *
+     */
+    private static String formatThrowable(Optional<Throwable> e) {
+        return e.map(EchoingLogger::formatThrowable).orElse("");
+    }
+
+    /**
+     *
+     */
+    private static String formatThrowable(@NotNull Throwable e) {
+        return (e.getMessage() != null ? ": " + e.getMessage() : "") + System.lineSeparator() +
+            Arrays.stream(e.getStackTrace())
+                .map(StackTraceElement::toString)
+                .map(s -> "at " + s)
+                .collect(Collectors.joining(System.lineSeparator()));
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 77c365d..46beeeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1491,6 +1491,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
     }
 
     /**
+     * Sets grid start time.
+     *
+     * @param val New time value.
+     */
+    @SuppressWarnings("unused")
+    public void setGridStartTime(long val) {
+        this.gridStartTime = val;
+    }
+
+    /**
      * @param sockAddr Remote address.
      * @param timeoutHelper Timeout helper.
      * @return Opened socket.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
index 7d09c23..c0c3289 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -119,4 +120,13 @@ public class ClusterReadOnlyModeAbstractTest extends GridCommonAbstractTest {
             ignite.context().cache().context().readOnlyMode(readOnly);
         }
     }
+
+    /**
+     * @param e Exception.
+     */
+    public static void checkThatRootCauseIsReadOnly(Throwable e) {
+        for (Throwable t = e; t != null; t = t.getCause())
+            if (t.getCause() == null)
+                assertTrue(t.getMessage(), t instanceof IgniteClusterReadOnlyException);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
index be9bf30..c0f243a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
@@ -84,7 +84,7 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
                             fail("Put must fail for cache " + cacheName);
                         }
                         catch (Exception e) {
-                            // No-op.
+                            checkThatRootCauseIsReadOnly(e);
                         }
 
                         // All removes must fail.
@@ -94,7 +94,7 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
                             fail("Remove must fail for cache " + cacheName);
                         }
                         catch (Exception e) {
-                            // No-op.
+                            checkThatRootCauseIsReadOnly(e);
                         }
                     }
                     else {
@@ -115,7 +115,7 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
 
         for (Ignite ignite : G.allGrids()) {
             for (String cacheName : CACHE_NAMES) {
-                boolean failed = false;
+                Throwable failed = null;
 
                 try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cacheName)) {
                     for (int i = 0; i < 10; i++) {
@@ -125,11 +125,13 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
                     }
                 }
                 catch (CacheException ignored) {
-                    failed = true;
+                    failed = ignored;
                 }
 
-                if (failed != readOnly)
+                if ((failed == null) == readOnly)
                     fail("Streaming to " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
+
+                checkThatRootCauseIsReadOnly(failed);
             }
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 416a726..62863ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -166,6 +166,8 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
         System.setProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED,
             Boolean.toString(enablePendingTxTracker));
 
+        System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, Boolean.toString(enablePendingTxTracker));
+
         return cfg;
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
index 855f3a8..2ef8878 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
@@ -170,7 +170,7 @@ public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTes
         boolean fail = false;
 
         try (WALIterator it = sharedContext.wal().replay(null)) {
-            dbMgr.applyUpdatesOnRecovery(it, (ptr, rec) -> true, (entry) -> true);
+            dbMgr.applyUpdatesOnRecovery(it, (ptr, rec) -> true, (rec, entry) -> true);
         }
         catch (IgniteCheckedException e) {
             if (nodeStopPoint.needCleanUp)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index a1ae17e..a4c7a13 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -20,6 +20,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
@@ -42,7 +43,7 @@ public class SegmentAwareTest {
      */
     @Test
     public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException {
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         int iterationCnt = 100_000;
         int segmentToHandle = 1;
@@ -80,7 +81,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -97,7 +98,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -114,7 +115,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -137,7 +138,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishAwaitSegment_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -154,7 +155,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -174,7 +175,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -194,7 +195,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -214,7 +215,7 @@ public class SegmentAwareTest {
     @Test
     public void testCorrectCalculateNextSegmentIndex() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.curAbsWalIdx(5);
 
@@ -231,7 +232,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(2, false);
+        SegmentAware aware = new SegmentAware(2, false, new NullLogger());
 
         aware.curAbsWalIdx(1);
         aware.setLastArchivedAbsoluteIndex(-1);
@@ -251,7 +252,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(2, false);
+        SegmentAware aware = new SegmentAware(2, false, new NullLogger());
 
         aware.curAbsWalIdx(1);
         aware.setLastArchivedAbsoluteIndex(-1);
@@ -271,7 +272,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(2, false);
+        SegmentAware aware = new SegmentAware(2, false, new NullLogger());
 
         aware.curAbsWalIdx(2);
         aware.setLastArchivedAbsoluteIndex(-1);
@@ -297,7 +298,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
 
@@ -314,7 +315,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishSegmentArchived_WhenMarkExactWaitingSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
 
@@ -331,7 +332,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishSegmentArchived_WhenSetGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
 
@@ -348,7 +349,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishSegmentArchived_WhenMarkGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
 
@@ -365,7 +366,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishSegmentArchived_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -385,7 +386,7 @@ public class SegmentAwareTest {
     @Test
     public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
 
@@ -404,7 +405,7 @@ public class SegmentAwareTest {
     @Test
     public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
 
         IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5));
@@ -425,7 +426,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, true);
+        SegmentAware aware = new SegmentAware(10, true, new NullLogger());
 
         aware.onSegmentCompressed(5);
 
@@ -444,7 +445,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, true);
+        SegmentAware aware = new SegmentAware(10, true, new NullLogger());
 
         aware.onSegmentCompressed(5);
 
@@ -462,7 +463,7 @@ public class SegmentAwareTest {
      */
     @Test
     public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException {
-        SegmentAware aware = new SegmentAware(10, true);
+        SegmentAware aware = new SegmentAware(10, true, new NullLogger());
 
         aware.setLastArchivedAbsoluteIndex(6);
 
@@ -476,7 +477,7 @@ public class SegmentAwareTest {
     @Test
     public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, true);
+        SegmentAware aware = new SegmentAware(10, true, new NullLogger());
         aware.onSegmentCompressed(5);
 
         IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress);
@@ -493,7 +494,7 @@ public class SegmentAwareTest {
      */
     @Test
     public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException {
-        SegmentAware aware = new SegmentAware(10, true);
+        SegmentAware aware = new SegmentAware(10, true, new NullLogger());
 
         for (int i = 0; i < 5; i++) {
             aware.setLastArchivedAbsoluteIndex(i);
@@ -518,7 +519,7 @@ public class SegmentAwareTest {
     @Test
     public void testReserveCorrectly() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         //when: reserve one segment twice and one segment once.
         aware.reserve(5);
@@ -562,7 +563,7 @@ public class SegmentAwareTest {
     @Test
     public void testAssertFail_WhenReleaseUnreservedSegment() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.reserve(5);
         try {
@@ -582,7 +583,7 @@ public class SegmentAwareTest {
     @Test
     public void testReserveWorkSegmentCorrectly() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         //when: lock one segment twice.
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
@@ -616,7 +617,7 @@ public class SegmentAwareTest {
     @Test
     public void testAssertFail_WhenReleaseUnreservedWorkSegment() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10, false);
+        SegmentAware aware = new SegmentAware(10, false, new NullLogger());
 
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
         try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
index 956014f..a6a9b0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
@@ -18,22 +18,32 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.OpenOption;
 import java.util.List;
 import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
 import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
@@ -53,16 +63,24 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.reader
  * The test check, that StandaloneWalRecordsIterator correctly close file descriptors associated with WAL files.
  */
 public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest {
+    /** Wal segment size. */
+    private static final int WAL_SEGMENT_SIZE = 512 * 1024;
+
+    /** Wal compaction enabled. */
+    private boolean walCompactionEnabled;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(name);
 
         cfg.setDataStorageConfiguration(
-            new DataStorageConfiguration().
-                setDefaultDataRegionConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
                     new DataRegionConfiguration()
                         .setPersistenceEnabled(true)
                 )
+                .setWalSegmentSize(WAL_SEGMENT_SIZE)
+                .setWalCompactionEnabled(walCompactionEnabled)
         );
 
         return cfg;
@@ -70,6 +88,8 @@ public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
+        walCompactionEnabled = false;
+
         super.beforeTest();
 
         stopAllGrids();
@@ -89,6 +109,195 @@ public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest {
     /**
      *
      */
+    public void testBlinkingTemporaryFile() throws Exception {
+        walCompactionEnabled = true;
+
+        IgniteEx ig = (IgniteEx)startGrid();
+
+        String archiveWalDir = getArchiveWalDirPath(ig);
+
+        ig.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig.getOrCreateCache(
+            new CacheConfiguration<>().setName("c-n").setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+        IgniteCacheDatabaseSharedManager sharedMgr = ig.context().cache().context().database();
+
+        IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+        WALPointer fromPtr = null;
+
+        int recordsCnt = WAL_SEGMENT_SIZE / 8 /* record size */ * 5;
+
+        AtomicBoolean stopBlinking = new AtomicBoolean(false);
+        AtomicInteger blinkIterations = new AtomicInteger(0);
+
+        IgniteInternalFuture blinkFut = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (walMgr.lastCompactedSegment() < 2)
+                        U.sleep(10);
+
+                    File walArchive = new File(archiveWalDir);
+                    File consIdFolder = new File(walArchive, "node00-" + ig.cluster().localNode().consistentId().toString());
+                    File compressedWalSegment = new File(consIdFolder, FileDescriptor.fileName(1) + ".zip");
+                    File compressedTmpWalSegment = new File(consIdFolder, FileDescriptor.fileName(1) + ".zip.tmp");
+
+                    while (!stopBlinking.get()) {
+                        Files.copy(compressedWalSegment.toPath(), compressedTmpWalSegment.toPath());
+
+                        U.sleep(10);
+
+                        U.delete(compressedTmpWalSegment);
+
+                        blinkIterations.incrementAndGet();
+                    }
+                }
+                catch (IgniteInterruptedCheckedException | IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, "blinky");
+
+        for (int i = 0; i < recordsCnt; i++) {
+            WALPointer ptr = walMgr.log(new PartitionDestroyRecord(i, i));
+
+            if (i == 100)
+                fromPtr = ptr;
+        }
+
+        assertNotNull(fromPtr);
+
+        cache.put(1, 1);
+
+        forceCheckpoint();
+
+        // Generate WAL segments for filling WAL archive folder.
+        for (int i = 0; i < 2 * ig.configuration().getDataStorageConfiguration().getWalSegments(); i++) {
+            sharedMgr.checkpointReadLock();
+
+            try {
+                walMgr.log(new SnapshotRecord(i, false), RolloverType.NEXT_SEGMENT);
+            }
+            finally {
+                sharedMgr.checkpointReadUnlock();
+            }
+        }
+
+        cache.put(2, 2);
+
+        forceCheckpoint();
+
+        System.out.println("@@@ " + blinkIterations.get() + " blink iterations already completed");
+
+        U.sleep(5000);
+
+        stopGrid();
+
+        for (int i = 0; i < 20; i++) {
+            WALIterator it = new IgniteWalIteratorFactory(log)
+                .iterator(new IteratorParametersBuilder().from((FileWALPointer)fromPtr).filesOrDirs(archiveWalDir));
+
+            TreeSet<Integer> foundCounters = new TreeSet<>();
+
+            it.forEach(x -> {
+                WALRecord rec = x.get2();
+
+                if (rec instanceof PartitionDestroyRecord)
+                    foundCounters.add(((WalRecordCacheGroupAware)rec).groupId());
+            });
+
+            assertEquals(new Integer(100), foundCounters.first());
+            assertEquals(new Integer(recordsCnt - 1), foundCounters.last());
+            assertEquals(recordsCnt - 100, foundCounters.size());
+
+            System.out.println("@@@ " + blinkIterations.get() + " blink iterations already completed");
+        }
+
+        stopBlinking.set(true);
+
+        System.out.println("@@@ " + blinkIterations.get() + " blink iterations finally completed");
+
+        blinkFut.get();
+    }
+
+    /**
+     *
+     */
+    public void testBoundedIterationOverSeveralSegments() throws Exception {
+        walCompactionEnabled = true;
+
+        IgniteEx ig = (IgniteEx)startGrid();
+
+        String archiveWalDir = getArchiveWalDirPath(ig);
+
+        ig.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig.getOrCreateCache(
+            new CacheConfiguration<>().setName("c-n").setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+        IgniteCacheDatabaseSharedManager sharedMgr = ig.context().cache().context().database();
+
+        IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+        WALPointer fromPtr = null;
+
+        int recordsCnt = WAL_SEGMENT_SIZE / 8 /* record size */ * 5;
+
+        for (int i = 0; i < recordsCnt; i++) {
+            WALPointer ptr = walMgr.log(new PartitionDestroyRecord(i, i));
+
+            if (i == 100)
+                fromPtr = ptr;
+        }
+
+        assertNotNull(fromPtr);
+
+        cache.put(1, 1);
+
+        forceCheckpoint();
+
+        // Generate WAL segments for filling WAL archive folder.
+        for (int i = 0; i < 2 * ig.configuration().getDataStorageConfiguration().getWalSegments(); i++) {
+            sharedMgr.checkpointReadLock();
+
+            try {
+                walMgr.log(new SnapshotRecord(i, false), RolloverType.NEXT_SEGMENT);
+            }
+            finally {
+                sharedMgr.checkpointReadUnlock();
+            }
+        }
+
+        cache.put(2, 2);
+
+        forceCheckpoint();
+
+        U.sleep(5000);
+
+        stopGrid();
+
+        WALIterator it = new IgniteWalIteratorFactory(log)
+            .iterator(new IteratorParametersBuilder().from((FileWALPointer)fromPtr).filesOrDirs(archiveWalDir));
+
+        TreeSet<Integer> foundCounters = new TreeSet<>();
+
+        it.forEach(x -> {
+            WALRecord rec = x.get2();
+
+            if (rec instanceof PartitionDestroyRecord)
+                foundCounters.add(((WalRecordCacheGroupAware)rec).groupId());
+        });
+
+        assertEquals(new Integer(100), foundCounters.first());
+        assertEquals(new Integer(recordsCnt - 1), foundCounters.last());
+        assertEquals(recordsCnt - 100, foundCounters.size());
+    }
+
+    /**
+     * Check correct closing file descriptors.
+     *
+     */
     private String createWalFiles() throws Exception {
         IgniteEx ig = (IgniteEx)startGrid();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTrackerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTrackerTest.java
new file mode 100644
index 0000000..39ed106
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTrackerTest.java
@@ -0,0 +1,853 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ * 
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridDebug;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit test for {@link LocalPendingTransactionsTracker}
+ */
+public class LocalPendingTransactionsTrackerTest {
+    /** Timeout executor. */
+    private static ScheduledExecutorService timeoutExecutor;
+
+    /** Tracker. */
+    private LocalPendingTransactionsTracker tracker;
+
+    /**
+     *
+     */
+    @BeforeClass
+    public static void setUpClass() {
+        timeoutExecutor = new ScheduledThreadPoolExecutor(1);
+
+        U.onGridStart();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED, "true");
+    }
+
+    /**
+     *
+     */
+    @AfterClass
+    public static void tearDownClass() {
+        timeoutExecutor.shutdown();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED);
+    }
+
+    /**
+     *
+     */
+    @Before
+    public void setUp() {
+        GridTimeoutProcessor time = Mockito.mock(GridTimeoutProcessor.class);
+        Mockito.when(time.addTimeoutObject(Mockito.any())).thenAnswer(mock -> {
+            GridTimeoutObject timeoutObj = (GridTimeoutObject)mock.getArguments()[0];
+
+            long endTime = timeoutObj.endTime();
+
+            timeoutExecutor.schedule(timeoutObj::onTimeout, endTime - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+
+            return null;
+        });
+
+        GridCacheSharedContext<?, ?> cctx = Mockito.mock(GridCacheSharedContext.class);
+        Mockito.when(cctx.time()).thenReturn(time);
+        Mockito.when(cctx.logger(LocalPendingTransactionsTracker.class)).thenReturn(new GridTestLog4jLogger());
+
+        tracker = new LocalPendingTransactionsTracker(cctx);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testCurrentlyPreparedTxs() {
+        txPrepare(1);
+        txKeyWrite(1, 10);
+        txKeyWrite(1, 11);
+
+        txPrepare(2);
+        txKeyWrite(2, 20);
+        txKeyWrite(2, 21);
+        txKeyWrite(2, 22);
+
+        txPrepare(3);
+        txKeyWrite(3, 30);
+
+        txCommit(2);
+
+        tracker.writeLockState();
+
+        try {
+            Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+            assertEquals(2, currentlyPreparedTxs.size());
+            assertTrue(currentlyPreparedTxs.contains(nearXidVersion(1)));
+            assertTrue(currentlyPreparedTxs.contains(nearXidVersion(3)));
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        txKeyWrite(3, 31);
+        txCommit(3);
+
+        tracker.writeLockState();
+
+        try {
+            Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+            assertEquals(1, currentlyPreparedTxs.size());
+            assertTrue(currentlyPreparedTxs.contains(nearXidVersion(1)));
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testMultiplePrepareCommitMarkers() {
+        txPrepare(1);
+        txKeyWrite(1, 10);
+
+        txPrepare(2);
+        txKeyWrite(2, 20);
+        txPrepare(2);
+        txKeyWrite(2, 21);
+        txPrepare(2);
+        txKeyWrite(2, 22);
+
+        txPrepare(3);
+        txKeyWrite(3, 30);
+        txPrepare(3);
+        txKeyWrite(3, 31);
+
+        txCommit(3);
+        txCommit(3);
+
+        txCommit(1);
+
+        txCommit(2);
+        txCommit(2);
+
+        tracker.writeLockState();
+
+        try {
+            Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+            assertEquals(1, currentlyPreparedTxs.size());
+            assertTrue(currentlyPreparedTxs.contains(nearXidVersion(2)));
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testCommitsMoreThanPreparesForbidden() {
+        txPrepare(1);
+
+        txKeyWrite(1, 10);
+        txKeyWrite(1, 11);
+
+        txCommit(1);
+
+        try {
+            txCommit(1);
+
+            fail("We should fail if number of commits is more than number of prepares.");
+        }
+        catch (Throwable ignored) {
+            // Expected.
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testRollback() {
+        txRollback(1); // Tx can be rolled back before prepare.
+
+        txPrepare(2);
+        txKeyWrite(2, 20);
+
+        txPrepare(3);
+        txKeyWrite(3, 30);
+        txPrepare(3);
+        txKeyWrite(3, 31);
+
+        txCommit(3);
+
+        txRollback(2);
+        txRollback(3);
+
+        tracker.writeLockState();
+
+        try {
+            Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+            assertEquals(0, currentlyPreparedTxs.size());
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test(timeout = 10_000)
+    public void testAwaitFinishOfPreparedTxs() throws Exception {
+        txPrepare(1);
+
+        txPrepare(2);
+        txPrepare(2);
+
+        txPrepare(3);
+        txPrepare(3);
+        txCommit(3);
+
+        txPrepare(4);
+        txCommit(4);
+
+        txPrepare(5);
+        txPrepare(5);
+        txPrepare(5);
+        txCommit(5);
+
+        tracker.writeLockState();
+
+        IgniteInternalFuture<Set<GridCacheVersion>> fut;
+        try {
+            tracker.startTxFinishAwaiting(1_000, 10_000);
+
+            fut = tracker.awaitPendingTxsFinished(Collections.emptySet());
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        Thread.sleep(100);
+
+        txCommit(5);
+        txCommit(2);
+        txCommit(2);
+
+        long curTs = U.currentTimeMillis();
+
+        Set<GridCacheVersion> pendingTxs = fut.get();
+
+        assertTrue("Waiting for awaitFinishOfPreparedTxs future too long", U.currentTimeMillis() - curTs < 1_000);
+
+        assertEquals(3, pendingTxs.size());
+        assertTrue(pendingTxs.contains(nearXidVersion(1)));
+        assertTrue(pendingTxs.contains(nearXidVersion(3)));
+        assertTrue(pendingTxs.contains(nearXidVersion(5)));
+
+        txCommit(1);
+        txCommit(3);
+        txCommit(5);
+
+        tracker.writeLockState();
+
+        try {
+            tracker.startTxFinishAwaiting(1_000, 10_000);
+
+            fut = tracker.awaitPendingTxsFinished(Collections.emptySet());
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        assertTrue(fut.get().isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test(timeout = 10_000)
+    public void testAwaitFinishOfPreparedTxsTimeouts() throws Exception {
+        txPrepare(1);
+        txCommit(1);
+
+        txPrepare(2);
+        txKeyRead(2, 10);
+
+        txPrepare(3);
+        txKeyWrite(3, 11);
+
+        txPrepare(4);
+
+        long curTs, waitMs;
+
+        Set<GridCacheVersion> pendingTxs;
+
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(1);
+        final CountDownLatch latch3 = new CountDownLatch(1);
+        final CountDownLatch latch4 = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                latch1.countDown();
+
+                latch2.await();
+
+                txCommit(2);
+
+                latch3.await();
+
+                Thread.sleep(200);
+                txCommit(3);
+
+                latch4.await();
+
+                Thread.sleep(200);
+                txCommit(4);
+            }
+            catch (InterruptedException ignored) {
+                // No-op
+            }
+        }).start();
+
+        latch1.await();
+
+        pendingTxs = awaitFinishOfPreparedTxs(100, 200);
+
+        assertEquals(3, pendingTxs.size());
+        assertTrue(pendingTxs.contains(nearXidVersion(2)));
+        assertTrue(pendingTxs.contains(nearXidVersion(3)));
+        assertTrue(pendingTxs.contains(nearXidVersion(4)));
+
+        latch2.countDown();
+
+        curTs = U.currentTimeMillis();
+
+        pendingTxs = awaitFinishOfPreparedTxs(100, 200);
+
+        waitMs = U.currentTimeMillis() - curTs;
+
+        assertTrue("Waiting for awaitFinishOfPreparedTxs future too short: " + waitMs, waitMs > 200 - 50);
+        assertTrue("Waiting for awaitFinishOfPreparedTxs future too long: " + waitMs, waitMs < 200 + 500);
+
+        assertEquals(2, pendingTxs.size());
+        assertTrue(pendingTxs.contains(nearXidVersion(3)));
+        assertTrue(pendingTxs.contains(nearXidVersion(4)));
+
+        latch3.countDown();
+
+        curTs = U.currentTimeMillis();
+
+        pendingTxs = awaitFinishOfPreparedTxs(100, 300);
+
+        waitMs = U.currentTimeMillis() - curTs;
+
+        assertTrue("Waiting for awaitFinishOfPreparedTxs future too short: " + waitMs, waitMs > 200 - 50);
+        assertTrue("Waiting for awaitFinishOfPreparedTxs future too long: " + waitMs, waitMs < 200 + 500);
+
+        assertEquals(1, pendingTxs.size());
+        assertTrue(pendingTxs.contains(nearXidVersion(4)));
+
+        latch4.countDown();
+
+        curTs = U.currentTimeMillis();
+
+        pendingTxs = awaitFinishOfPreparedTxs(300, 500);
+
+        waitMs = U.currentTimeMillis() - curTs;
+
+        assertTrue("Waiting for awaitFinishOfPreparedTxs future too short: " + waitMs, waitMs > 200 - 50);
+        assertTrue("Waiting for awaitFinishOfPreparedTxs future too long: " + waitMs, waitMs < 200 + 500);
+
+        assertTrue(pendingTxs.isEmpty());
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void trackingCommittedTest() {
+        txPrepare(1);
+        txCommit(1);
+
+        txPrepare(2);
+
+        tracker.writeLockState();
+        try {
+            tracker.startTrackingCommitted();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        txCommit(2);
+
+        txPrepare(3);
+        txCommit(3);
+
+        txPrepare(4);
+
+        tracker.writeLockState();
+
+        Set<GridCacheVersion> committedTxs;
+        try {
+            committedTxs = tracker.stopTrackingCommitted().committedTxs();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        assertEquals(2, committedTxs.size());
+        assertTrue(committedTxs.contains(nearXidVersion(2)));
+        assertTrue(committedTxs.contains(nearXidVersion(3)));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void trackingPreparedTest() {
+        txPrepare(1);
+        txCommit(1);
+
+        txPrepare(2);
+
+        tracker.writeLockState();
+        try {
+            tracker.startTrackingPrepared();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        txCommit(2);
+
+        txPrepare(3);
+        txCommit(3);
+
+        txPrepare(4);
+
+        tracker.writeLockState();
+
+        Set<GridCacheVersion> committedTxs;
+        try {
+            committedTxs = tracker.stopTrackingPrepared();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        assertEquals(2, committedTxs.size());
+        assertTrue(committedTxs.contains(nearXidVersion(3)));
+        assertTrue(committedTxs.contains(nearXidVersion(4)));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test(timeout = 10_000)
+    public void testConsistentCutUseCase() throws Exception {
+        txPrepare(1);
+        txPrepare(2);
+        txPrepare(3);
+
+        txCommit(3);
+
+        tracker.writeLockState(); // Cut 1.
+
+        IgniteInternalFuture<Set<GridCacheVersion>> awaitFutCut1;
+        try {
+            tracker.startTrackingCommitted();
+
+            tracker.startTxFinishAwaiting(1_000, 10_000);
+
+            awaitFutCut1 = tracker.awaitPendingTxsFinished(Collections.emptySet());
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        txCommit(1);
+
+        Set<GridCacheVersion> failedToFinish = awaitFutCut1.get();
+
+        assertEquals(1, failedToFinish.size());
+        assertTrue(failedToFinish.contains(nearXidVersion(2)));
+
+        txCommit(2);
+
+        txPrepare(4);
+        txCommit(4);
+
+        txPrepare(5);
+
+        txPrepare(6);
+
+        tracker.writeLockState(); // Cut 2.
+
+        Set<GridCacheVersion> committedFrom1to2;
+        Set<GridCacheVersion> preparedOn2;
+        try {
+            committedFrom1to2 = tracker.stopTrackingCommitted().committedTxs();
+
+            preparedOn2 = tracker.currentlyPreparedTxs();
+
+            tracker.startTrackingPrepared();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        assertEquals(2, preparedOn2.size());
+        assertTrue(preparedOn2.contains(nearXidVersion(5)));
+        assertTrue(preparedOn2.contains(nearXidVersion(6)));
+
+        assertEquals(3, committedFrom1to2.size());
+        assertTrue(committedFrom1to2.contains(nearXidVersion(1)));
+        assertTrue(committedFrom1to2.contains(nearXidVersion(2)));
+        assertTrue(committedFrom1to2.contains(nearXidVersion(4)));
+
+        txPrepare(7);
+        txPrepare(8);
+
+        txCommit(6);
+        txCommit(7);
+
+        tracker.writeLockState(); // Cut 3.
+        Set<GridCacheVersion> preparedFrom2to3;
+        try {
+            preparedFrom2to3 = tracker.stopTrackingPrepared();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        assertEquals(2, preparedFrom2to3.size());
+        assertTrue(preparedFrom2to3.contains(nearXidVersion(7)));
+        assertTrue(preparedFrom2to3.contains(nearXidVersion(8)));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDependentTransactions() {
+        tracker.writeLockState();
+        try {
+            tracker.startTrackingCommitted();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        txPrepare(1);
+        txKeyRead(1, 0);
+        txKeyWrite(1, 10);
+        txKeyRead(1, 20);
+        txCommit(1);
+
+        txPrepare(2);
+        txKeyWrite(2, 30);
+        txKeyWrite(2, 40);
+        txCommit(2);
+
+        txPrepare(3);
+        txKeyRead(3, 10); // (w -> r) is a dependency
+        txCommit(3);
+
+        txPrepare(4);
+        txKeyWrite(4, 20); // (r -> w) is not a dependency
+        txCommit(4);
+
+        txPrepare(5);
+        txKeyRead(5, 30); // (w -> r) is a dependency
+        txCommit(5);
+
+        txPrepare(6);
+        txKeyWrite(6, 40); // (w -> w) is a dependency
+        txCommit(6);
+
+        txPrepare(7);
+        txKeyRead(7, 0); // (r -> r) is not a dependency
+        txCommit(7);
+
+        tracker.writeLockState();
+
+        TrackCommittedResult res;
+        try {
+            res = tracker.stopTrackingCommitted();
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        assertEquals(7, res.committedTxs().size());
+        assertEquals(2, res.dependentTxsGraph().size());
+
+        assertTrue(res.dependentTxsGraph().containsKey(nearXidVersion(1)));
+        assertTrue(res.dependentTxsGraph().containsKey(nearXidVersion(2)));
+
+        Set<GridCacheVersion> dependentFrom1 = res.dependentTxsGraph().get(nearXidVersion(1));
+        assertEquals(1, dependentFrom1.size());
+        assertTrue(dependentFrom1.contains(nearXidVersion(3)));
+
+        Set<GridCacheVersion> dependentFrom2 = res.dependentTxsGraph().get(nearXidVersion(2));
+        assertEquals(2, dependentFrom2.size());
+        assertTrue(dependentFrom2.contains(nearXidVersion(5)));
+        assertTrue(dependentFrom2.contains(nearXidVersion(6)));
+    }
+
+    /**
+     * Transaction tracker memory leak test.
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTrackerMemoryLeak() throws Exception {
+        int allowedLeakSize = 100 * 1024;
+
+        // Warmup phase.
+        long sizeBefore = memoryFootprintForTransactionTracker(1000, 20);
+
+        // Main phase.
+        long sizeAfter = memoryFootprintForTransactionTracker(5000, 20);
+
+        assertTrue("Possible memory leak detected. Memory consumed before transaction tracking: " + sizeBefore +
+            ", memory consumed after transaction tracking: " + sizeAfter, sizeAfter - sizeBefore < allowedLeakSize);
+    }
+
+    /**
+     * @param iterationsCnt Iterations count.
+     * @param threadsCnt Threads count.
+     * @return Length of dump file.
+     * @throws Exception If failed.
+     */
+    private long memoryFootprintForTransactionTracker(int iterationsCnt, int threadsCnt) throws Exception {
+        AtomicInteger txCnt = new AtomicInteger();
+
+        AtomicInteger trackerState = new AtomicInteger();
+
+        File dumpFile = new File(U.defaultWorkDirectory(), "test.hprof");
+
+        String heapDumpFileName = dumpFile.getAbsolutePath();
+
+        Runnable txRunnable = new Runnable() {
+            @Override public void run() {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int iteration = 0; iteration < iterationsCnt; iteration++) {
+                    int txId = txCnt.incrementAndGet();
+
+                    txPrepare(txId);
+
+                    int opCnt = rnd.nextInt(100);
+
+                    for (int i = 0; i < opCnt; i++) {
+                        if (rnd.nextBoolean())
+                            txKeyRead(txId, rnd.nextInt());
+                        else
+                            txKeyWrite(txId, rnd.nextInt());
+                    }
+
+                    if (rnd.nextInt(10) == 0)
+                        txRollback(txId);
+                    else
+                        txCommit(txId);
+
+                    // Change tracker state
+                    if (rnd.nextInt(20) == 0) {
+                        tracker.writeLockState();
+
+                        try {
+                            int state = trackerState.getAndIncrement();
+
+                            switch (state % 4) {
+                                case 0:
+                                    tracker.startTrackingPrepared();
+
+                                    break;
+                                case 1:
+                                    tracker.stopTrackingPrepared();
+
+                                    break;
+                                case 2:
+                                    tracker.startTrackingCommitted();
+
+                                    break;
+                                case 3:
+                                    tracker.stopTrackingCommitted();
+                            }
+                        }
+                        finally {
+                            tracker.writeUnlockState();
+                        }
+                    }
+                }
+            }
+        };
+
+        GridTestUtils.runMultiThreaded(txRunnable, threadsCnt, "tx-runner");
+
+        tracker.writeLockState();
+
+        try {
+            int state = trackerState.get();
+
+            switch (state % 4) {
+                case 0:
+                    break;
+                case 1:
+                    tracker.stopTrackingPrepared();
+                    tracker.startTrackingCommitted();
+                    tracker.stopTrackingCommitted();
+
+                    break;
+                case 2:
+                    tracker.startTrackingCommitted();
+                    tracker.stopTrackingCommitted();
+
+                    break;
+                case 3:
+                    tracker.stopTrackingCommitted();
+            }
+
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        GridDebug.dumpHeap(heapDumpFileName, true);
+
+        long fileSize = dumpFile.length();
+
+        dumpFile.delete();
+
+        return fileSize;
+    }
+
+    /**
+     * @param txId Test transaction ID.
+     */
+    private void txPrepare(int txId) {
+        tracker.onTxPrepared(nearXidVersion(txId));
+    }
+
+    /**
+     * @param txId Test transaction ID.
+     */
+    private void txCommit(int txId) {
+        tracker.onTxCommitted(nearXidVersion(txId));
+    }
+
+    /**
+     * @param txId Test transaction ID.
+     */
+    private void txRollback(int txId) {
+        tracker.onTxRolledBack(nearXidVersion(txId));
+    }
+
+    /**
+     * @param txId Test transaction ID.
+     * @param key Key.
+     */
+    private void txKeyWrite(int txId, int key) {
+        KeyCacheObjectImpl keyCacheObj = new KeyCacheObjectImpl(key, ByteBuffer.allocate(4).putInt(key).array(), 1);
+
+        tracker.onKeysWritten(nearXidVersion(txId), Collections.singletonList(keyCacheObj));
+    }
+
+    /**
+     * @param txId Test transaction ID.
+     * @param key Key.
+     */
+    private void txKeyRead(int txId, int key) {
+        KeyCacheObjectImpl keyCacheObj = new KeyCacheObjectImpl(key, ByteBuffer.allocate(4).putInt(key).array(), 1);
+
+        tracker.onKeysRead(nearXidVersion(txId), Collections.singletonList(keyCacheObj));
+    }
+
+    /**
+     * @param txId Test transaction ID.
+     * @return Version of transaction for the given {@code txId}.
+     */
+    private GridCacheVersion nearXidVersion(int txId) {
+        return new GridCacheVersion(0, txId, 0);
+    }
+
+    /**
+     * @param preparedTxsTimeout Prepared transactions timeout.
+     * @param committingTxsTimeout Committing transactions timeout.
+     * @return Collection of local transactions in committing state.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Set<GridCacheVersion> awaitFinishOfPreparedTxs(
+        long preparedTxsTimeout,
+        long committingTxsTimeout
+    ) throws IgniteCheckedException {
+        IgniteInternalFuture<Set<GridCacheVersion>> fut;
+
+        tracker.writeLockState();
+
+        try {
+            tracker.startTxFinishAwaiting(preparedTxsTimeout, committingTxsTimeout);
+
+            fut = tracker.awaitPendingTxsFinished(Collections.emptySet());
+        }
+        finally {
+            tracker.writeUnlockState();
+        }
+
+        return fut.get();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java
index fc995a8..90dfd98 100644
--- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java
@@ -17,9 +17,13 @@
 package org.apache.ignite.lang.utils;
 
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntConsumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.util.GridCircularBuffer;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -177,4 +181,79 @@ public class GridCircularBufferSelfTest extends GridCommonAbstractTest {
 
         info("Buffer: " + buf);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testEmptyBufIterator() throws Exception {
+        assertFalse(new GridCircularBuffer<>(8).iterator().hasNext());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHalfFullBufIterator() throws Exception {
+        int size = 8;
+
+        GridCircularBuffer<Integer> buf = new GridCircularBuffer<>(size);
+
+        IntStream.range(0, size / 2).forEach(makeConsumer(buf));
+
+        checkExpectedRange(0, size / 2, buf);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFullBufIterator() throws Exception {
+        int size = 8;
+
+        GridCircularBuffer<Integer> buf = new GridCircularBuffer<>(size);
+
+        IntStream.range(0, size).forEach(makeConsumer(buf));
+
+        checkExpectedRange(0, size, buf);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testOverflownBufIterator() throws Exception {
+        int size = 8;
+
+        GridCircularBuffer<Integer> buf = new GridCircularBuffer<>(size);
+
+        IntStream.range(0, 3 * size / 2).forEach(makeConsumer(buf));
+
+        checkExpectedRange(size / 2, 3 * size / 2, buf);
+    }
+
+    /**
+     *
+     */
+    private static IntConsumer makeConsumer(GridCircularBuffer<Integer> buf) {
+        return t -> {
+            try {
+                buf.add(t);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedException(e);
+            }
+        };
+    }
+
+    /**
+     *
+     */
+    private void checkExpectedRange(int beginInclusive, int endExclusive, GridCircularBuffer<Integer> buf) {
+        Iterator<Integer> iter = buf.iterator();
+
+        IntStream.range(beginInclusive, endExclusive).forEach(i -> assertEquals(i, iter.next().intValue()));
+
+        assertFalse(iter.hasNext());
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 7797f23..f45df11 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import javax.cache.integration.CompletionListener;
@@ -590,6 +591,21 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
         Ignite crd = null;
 
         for (Ignite g : G.allGrids()) {
+            if (nodes != null) {
+                Set<UUID> gClusterNodeIds = g.cluster().nodes().stream()
+                    .map(ClusterNode::id)
+                    .collect(Collectors.toSet());
+
+                Set<UUID> awaitPmeNodeIds = nodes.stream()
+                    .map(ClusterNode::id)
+                    .collect(Collectors.toSet());
+
+                gClusterNodeIds.retainAll(awaitPmeNodeIds);
+
+                if (gClusterNodeIds.isEmpty())
+                    continue; // Node g is from another cluster and can't be elected as coordinator.
+            }
+
             ClusterNode node = g.cluster().localNode();
 
             if (crd == null || node.order() < crd.cluster().localNode().order()) {
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 490f7f5..f62d110 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -2747,7 +2747,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
             db.checkpointReadLock();
 
             try {
-                U.invoke(GridCacheDatabaseSharedManager.class, db, "applyUpdate", ctx, dataEntry);
+                U.invoke(GridCacheDatabaseSharedManager.class, db, "applyUpdate", ctx, dataEntry, false);
             }
             finally {
                 db.checkpointReadUnlock();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java
index 6fdb03a..c0ec2a6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java
@@ -59,7 +59,7 @@ public class ClusterReadOnlyModeSqlTest extends ClusterReadOnlyModeAbstractTest
                     cur.getAll();
                 }
 
-                boolean failed = false;
+                Throwable failed = null;
 
                 try (FieldsQueryCursor<?> cur = cache.query(new SqlFieldsQuery("DELETE FROM Integer"))) {
                     cur.getAll();
@@ -68,13 +68,15 @@ public class ClusterReadOnlyModeSqlTest extends ClusterReadOnlyModeAbstractTest
                     if (!readOnly)
                         log.error("Failed to delete data", ex);
 
-                    failed = true;
+                    failed = ex;
                 }
 
-                if (failed != readOnly)
+                if ((failed == null) == readOnly)
                     fail("SQL delete from " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
 
-                failed = false;
+                checkThatRootCauseIsReadOnly(failed);
+
+                failed = null;
 
                 try (FieldsQueryCursor<?> cur = cache.query(new SqlFieldsQuery(
                     "INSERT INTO Integer(_KEY, _VAL) VALUES (?, ?)").setArgs(rnd.nextInt(1000), rnd.nextInt()))) {
@@ -84,11 +86,13 @@ public class ClusterReadOnlyModeSqlTest extends ClusterReadOnlyModeAbstractTest
                     if (!readOnly)
                         log.error("Failed to insert data", ex);
 
-                    failed = true;
+                    failed = ex;
                 }
 
-                if (failed != readOnly)
+                if ((failed == null) == readOnly)
                     fail("SQL insert into " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
+
+                checkThatRootCauseIsReadOnly(failed);
             }
         }
     }
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 5651060..579eace 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -410,6 +410,17 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements IgniteDis
         return impl.gridStartTime();
     }
 
+
+    /**
+     * Sets grid start time.
+     *
+     * @param val New time value.
+     */
+    @SuppressWarnings("unused")
+    public void setGridStartTime(long val) {
+        impl.setGridStartTime(val);
+    }
+
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
         IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr;
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index 92c9658..f894a75 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -47,7 +47,7 @@ class ZkRuntimeState {
     int joinDataPartCnt;
 
     /** */
-    long gridStartTime;
+    volatile long gridStartTime;
 
     /** */
     volatile boolean joined;
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index c00c637..95fadb7 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -705,6 +705,15 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * Sets grid start time.
+     *
+     * @param val New time value.
+     */
+    public void setGridStartTime(long val) {
+        rtState.gridStartTime = val;
+    }
+
+    /**
      * Starts join procedure and waits for {@link EventType#EVT_NODE_JOINED} event for local node.
      *
      * @throws InterruptedException If interrupted.
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java
index 9913f61..36b04d9a 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java
@@ -32,6 +32,15 @@ public class ZookeeperDiscoverySpiTestUtil {
      * @return Test cluster.
      */
     public static TestingCluster createTestingCluster(int instances) {
+        return createTestingCluster(instances, 0);
+    }
+
+    /**
+     * @param instances Number of instances in.
+     * @param firstInstanceIdx First instance index.
+     * @return Test cluster.
+     */
+    public static TestingCluster createTestingCluster(int instances, int firstInstanceIdx) {
         String tmpDir;
 
         tmpDir = System.getenv("TMPFS_ROOT") != null
@@ -39,7 +48,7 @@ public class ZookeeperDiscoverySpiTestUtil {
 
         List<InstanceSpec> specs = new ArrayList<>();
 
-        for (int i = 0; i < instances; i++) {
+        for (int i = firstInstanceIdx, n = firstInstanceIdx + instances; i < n; i++) {
             File file = new File(tmpDir, "apacheIgniteTestZk-" + i);
 
             if (file.isDirectory())
diff --git a/parent/pom.xml b/parent/pom.xml
index 3546ebe..8564221 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -490,7 +490,7 @@
                                 <packages>org.apache.ignite.springdata20.repository*</packages>
                             </group>
                             <group>
-                            <title>RocketMQ integration</title>
+                                <title>RocketMQ integration</title>
                                 <packages>org.apache.ignite.stream.rocketmq*</packages>
                             </group>
                             <group>