You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/25 15:34:27 UTC

[ignite] branch ignite-cdc updated: IGNITE-13596 Flag to distinguish DataRecord on primary and backup added (#8904)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-cdc
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-cdc by this push:
     new db8bb81  IGNITE-13596 Flag to distinguish DataRecord on primary and backup added (#8904)
db8bb81 is described below

commit db8bb814c5c5a4876b21de7a22126ed74d6ffddf
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Mar 25 18:34:03 2021 +0300

    IGNITE-13596 Flag to distinguish DataRecord on primary and backup added (#8904)
---
 .../util/GridCommandHandlerClusterByClassTest.java |   3 +-
 .../apache/ignite/util/GridCommandHandlerTest.java |   3 +-
 .../internal/pagemem/wal/record/DataEntry.java     |  16 +++-
 .../internal/pagemem/wal/record/DataRecord.java    |   2 +-
 .../internal/pagemem/wal/record/LazyDataEntry.java |   6 +-
 .../internal/pagemem/wal/record/MvccDataEntry.java |   2 +-
 .../pagemem/wal/record/UnwrapDataEntry.java        |   6 +-
 .../internal/pagemem/wal/record/WALRecord.java     |  23 ++++-
 .../processors/cache/GridCacheMapEntry.java        |  42 ++++++---
 .../GridDistributedTxRemoteAdapter.java            |   3 +-
 .../dht/colocated/GridDhtDetachedCacheEntry.java   |   8 +-
 .../cache/distributed/near/GridNearCacheEntry.java |   8 +-
 .../GridCacheDatabaseSharedManager.java            |   3 +
 .../wal/reader/StandaloneWalRecordsIterator.java   |   7 +-
 .../wal/serializer/RecordDataV1Serializer.java     |  47 ++++++----
 .../wal/serializer/RecordDataV2Serializer.java     |  10 ++-
 .../cache/transactions/IgniteTxLocalAdapter.java   |   3 +-
 .../java/org/apache/ignite/TestStorageUtils.java   |   3 +-
 .../IgnitePdsSporadicDataRecordsOnBackupTest.java  |   2 +-
 ...CheckpointSimulationWithRealCpDisabledTest.java |   2 +-
 .../persistence/db/wal/IgniteWalRebalanceTest.java |   3 +-
 .../db/wal/reader/IgniteWalReaderTest.java         | 100 ++++++++++++++++++++-
 .../testframework/wal/record/RecordUtils.java      |   9 ++
 .../ignite/development/utils/DataEntryWrapper.java |   3 +-
 .../apache/ignite/development/utils/WalStat.java   |   2 +-
 .../utils/IgniteWalConverterArgumentsTest.java     |   4 +-
 .../utils/IgniteWalConverterSensitiveDataTest.java |   3 +-
 .../development/utils/IgniteWalConverterTest.java  |   4 +-
 28 files changed, 257 insertions(+), 70 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
index 4014d4e..aa55b91 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
@@ -1523,7 +1523,8 @@ public class GridCommandHandlerClusterByClassTest extends GridCommandHandlerClus
                 new GridCacheVersion(),
                 0L,
                 partId,
-                updateCntr
+                updateCntr,
+                false
             );
 
             GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 80c33ee..0b525ee 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -2630,7 +2630,8 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
                 new GridCacheVersion(),
                 0L,
                 partId,
-                updateCntr
+                updateCntr,
+                false
             );
 
             GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index dd05726..2244897 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -61,6 +61,10 @@ public class DataEntry {
     @GridToStringInclude
     protected long partCnt;
 
+    /** If {@code true} then change made on primary node. */
+    @GridToStringInclude
+    protected boolean primary;
+
     /** Constructor. */
     private DataEntry() {
         // No-op, used from factory methods.
@@ -76,6 +80,7 @@ public class DataEntry {
      * @param expireTime Expire time.
      * @param partId Partition ID.
      * @param partCnt Partition counter.
+     * @param primary {@code True} if node is primary for partition in the moment of logging.
      */
     public DataEntry(
         int cacheId,
@@ -86,7 +91,8 @@ public class DataEntry {
         GridCacheVersion writeVer,
         long expireTime,
         int partId,
-        long partCnt
+        long partCnt,
+        boolean primary
     ) {
         this.cacheId = cacheId;
         this.key = key;
@@ -97,6 +103,7 @@ public class DataEntry {
         this.expireTime = expireTime;
         this.partId = partId;
         this.partCnt = partCnt;
+        this.primary = primary;
 
         // Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
         assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op;
@@ -177,6 +184,13 @@ public class DataEntry {
         return expireTime;
     }
 
+    /**
+     * @return {@code True} if node is primary for partition in the moment of logging.
+     */
+    public boolean primary() {
+        return primary;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataEntry.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index ef6c3ba..2507fd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -35,7 +35,7 @@ public class DataRecord extends TimeStampRecord {
 
     /** {@inheritDoc} */
     @Override public RecordType type() {
-        return RecordType.DATA_RECORD;
+        return RecordType.DATA_RECORD_V2;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
index ba2fabc..e771bdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
@@ -60,6 +60,7 @@ public class LazyDataEntry extends DataEntry implements MarshalledDataEntry {
      * @param expireTime Expire time.
      * @param partId Partition ID.
      * @param partCnt Partition counter.
+     * @param primary {@code True} if node is primary for partition in the moment of logging.
      */
     public LazyDataEntry(
         GridCacheSharedContext cctx,
@@ -73,9 +74,10 @@ public class LazyDataEntry extends DataEntry implements MarshalledDataEntry {
         GridCacheVersion writeVer,
         long expireTime,
         int partId,
-        long partCnt
+        long partCnt,
+        boolean primary
     ) {
-        super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+        super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, primary);
 
         this.cctx = cctx;
         this.keyType = keyType;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
index c86593f..cecd867 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
@@ -56,7 +56,7 @@ public class MvccDataEntry extends DataEntry {
         long partCnt,
         MvccVersion mvccVer
     ) {
-        super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+        super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, false);
 
         this.mvccVer = mvccVer;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
index ad5e1d0..9395142 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
@@ -48,6 +48,7 @@ public class UnwrapDataEntry extends DataEntry implements UnwrappedDataEntry {
      * @param partCnt Partition counter.
      * @param cacheObjValCtx cache object value context for unwrapping objects.
      * @param keepBinary disable unwrapping for non primitive objects, Binary Objects would be returned instead.
+     * @param primary {@code True} if node is primary for partition in the moment of logging.
      */
     public UnwrapDataEntry(
         final int cacheId,
@@ -60,8 +61,9 @@ public class UnwrapDataEntry extends DataEntry implements UnwrappedDataEntry {
         final int partId,
         final long partCnt,
         final CacheObjectValueContext cacheObjValCtx,
-        final boolean keepBinary) {
-        super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+        final boolean keepBinary,
+        final boolean primary) {
+        super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, primary);
         this.cacheObjValCtx = cacheObjValCtx;
         this.keepBinary = keepBinary;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index f07b71a..2f95c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -45,7 +45,8 @@ public abstract class WALRecord {
         /** */
         PAGE_RECORD(1, PHYSICAL),
 
-        /** */
+        /** @deprecated Use {@link #DATA_RECORD_V2} instead. */
+        @Deprecated
         DATA_RECORD(2, LOGICAL),
 
         /** Checkpoint (begin) record */
@@ -206,7 +207,11 @@ public abstract class WALRecord {
         /** Encrypted WAL-record. */
         ENCRYPTED_RECORD(52, PHYSICAL),
 
-        /** Ecnrypted data record. */
+        /**
+         * Ecnrypted data record.
+         * @deprecated Use {@link #ENCRYPTED_DATA_RECORD_V3} instead.
+         */
+        @Deprecated
         ENCRYPTED_DATA_RECORD(53, LOGICAL),
 
         /** Mvcc data record. */
@@ -239,7 +244,11 @@ public abstract class WALRecord {
         /** Encrypted WAL-record. */
         ENCRYPTED_RECORD_V2(63, PHYSICAL),
 
-        /** Ecnrypted data record. */
+        /**
+         * Ecnrypted data record.
+         * @deprecated Use {@link #ENCRYPTED_DATA_RECORD_V3} instead.
+         */
+        @Deprecated
         ENCRYPTED_DATA_RECORD_V2(64, LOGICAL),
 
         /** Master key change record containing multiple keys for single cache group. */
@@ -252,7 +261,13 @@ public abstract class WALRecord {
         PARTITION_META_PAGE_DELTA_RECORD_V3(67, PHYSICAL),
 
         /** Index meta page delta record includes encryption status data. */
-        INDEX_META_PAGE_DELTA_RECORD(68, PHYSICAL);
+        INDEX_META_PAGE_DELTA_RECORD(68, PHYSICAL),
+
+        /** Data record V2. */
+        DATA_RECORD_V2(69, LOGICAL),
+
+        /** Ecnrypted data record. */
+        ENCRYPTED_DATA_RECORD_V3(70, LOGICAL);
 
         /** Index for serialization. Should be consistent throughout all versions. */
         private final int idx;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b0221a7..3807404 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1569,7 +1569,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             updateCntr0 = nextPartitionCounter(tx, updateCntr);
 
             if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                logPtr = logTxUpdate(tx, val, expireTime, updateCntr0);
+                logPtr = logTxUpdate(tx, val, expireTime, updateCntr0, topVer);
 
             update(val, expireTime, ttl, newVer, true);
 
@@ -1791,7 +1791,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             updateCntr0 = nextPartitionCounter(tx, updateCntr);
 
             if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                logPtr = logTxUpdate(tx, null, 0, updateCntr0);
+                logPtr = logTxUpdate(tx, null, 0, updateCntr0, topVer);
 
             drReplicate(drType, null, newVer, topVer);
 
@@ -2148,7 +2148,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 update(updated, expireTime, ttl, ver, true);
 
-                logUpdate(op, updated, ver, expireTime, 0);
+                logUpdate(op, updated, ver, expireTime, 0, cctx.affinity().affinityTopologyVersion());
 
                 if (evt) {
                     CacheObject evtOld = null;
@@ -2180,7 +2180,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
 
-                logUpdate(op, null, ver, CU.EXPIRE_TIME_ETERNAL, 0);
+                logUpdate(op, null, ver, CU.EXPIRE_TIME_ETERNAL, 0, cctx.affinity().affinityTopologyVersion());
 
                 if (evt) {
                     CacheObject evtOld = null;
@@ -3495,7 +3495,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             ver,
                             expireTime,
                             partition(),
-                            updateCntr
+                            updateCntr,
+                            cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer)
                         )));
                     }
                 }
@@ -4323,9 +4324,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param writeVer Write version.
      * @param expireTime Expire time.
      * @param updCntr Update counter.
+     * @param topVer Topology version.
      */
-    protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer, long expireTime, long updCntr)
-        throws IgniteCheckedException {
+    protected void logUpdate(
+        GridCacheOperation op,
+        CacheObject val,
+        GridCacheVersion writeVer,
+        long expireTime,
+        long updCntr,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
         // We log individual updates only in ATOMIC cache.
         assert cctx.atomic();
 
@@ -4340,7 +4348,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     writeVer,
                     expireTime,
                     partition(),
-                    updCntr)));
+                    updCntr,
+                    cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer))));
         }
         catch (StorageException e) {
             throw new IgniteCheckedException("Failed to log ATOMIC cache update [key=" + key + ", op=" + op +
@@ -4353,10 +4362,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param val Value.
      * @param expireTime Expire time (or 0 if not applicable).
      * @param updCntr Update counter.
+     * @param topVer Topology version.
      * @throws IgniteCheckedException In case of log failure.
      */
-    protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
-        throws IgniteCheckedException {
+    protected WALPointer logTxUpdate(
+        IgniteInternalTx tx,
+        CacheObject val,
+        long expireTime,
+        long updCntr,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
         assert cctx.transactional() && !cctx.transactionalSnapshot();
 
         if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked()
@@ -4375,7 +4390,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 tx.writeVersion(),
                 expireTime,
                 key.partition(),
-                updCntr)));
+                updCntr,
+                cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer))));
         }
         else
             return null;
@@ -6484,7 +6500,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr);
 
-            entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
+            entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0, topVer);
 
             if (!entry.isNear()) {
                 newRow = entry.localPartition().dataStore().createRow(
@@ -6571,7 +6587,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr);
 
-            entry.logUpdate(op, null, newVer, 0, updateCntr0);
+            entry.logUpdate(op, null, newVer, 0, updateCntr0, topVer);
 
             if (oldVal != null) {
                 assert !entry.deletedUnlocked();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 173888b..7cfe869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -631,7 +631,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                     writeVersion(),
                                                     0,
                                                     txEntry.key().partition(),
-                                                    txEntry.updateCounter()
+                                                    txEntry.updateCounter(),
+                                                    cacheCtx.affinity().primaryByPartition(cctx.localNode(), txEntry.key().partition(), topVer)
                                                 ),
                                                 txEntry
                                             )
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 48b8e8e..8e46629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -73,13 +74,14 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer, long expireTime, long updCntr) throws IgniteCheckedException {
+    @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer,
+        long expireTime, long updCntr, AffinityTopologyVersion topVer) {
         // No-op for detached entries, index is updated on primary or backup nodes.
     }
 
     /** {@inheritDoc} */
-    @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
-        throws IgniteCheckedException {
+    @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
+        AffinityTopologyVersion topVer) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index d8b1615..fc70aa3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -471,14 +471,14 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime, long updCntr)
-        throws IgniteCheckedException {
+    @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime,
+        long updCntr, AffinityTopologyVersion topVer) {
         // No-op: queries are disabled for near cache.
     }
 
     /** {@inheritDoc} */
-    @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
-        throws IgniteCheckedException {
+    @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
+        AffinityTopologyVersion topVer) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 8fa76c8..8c0228e 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2496,6 +2496,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 switch (rec.type()) {
                     case MVCC_DATA_RECORD:
                     case DATA_RECORD:
+                    case DATA_RECORD_V2:
                         checkpointReadLock();
 
                         try {
@@ -2635,8 +2636,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     case MVCC_DATA_RECORD:
                     case DATA_RECORD:
+                    case DATA_RECORD_V2:
                     case ENCRYPTED_DATA_RECORD:
                     case ENCRYPTED_DATA_RECORD_V2:
+                    case ENCRYPTED_DATA_RECORD_V3:
                         DataRecord dataRec = (DataRecord)rec;
 
                         for (DataEntry dataEntry : dataRec.writeEntries()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 472afba..cfaa670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -354,7 +354,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
         GridKernalContext kernalCtx = sharedCtx.kernalContext();
         IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
 
-        if (processor != null && (rec.type() == RecordType.DATA_RECORD || rec.type() == RecordType.MVCC_DATA_RECORD)) {
+        if (processor != null && (rec.type() == RecordType.DATA_RECORD
+            || rec.type() == RecordType.DATA_RECORD_V2
+            || rec.type() == RecordType.MVCC_DATA_RECORD)) {
             try {
                 return postProcessDataRecord((DataRecord)rec, kernalCtx, processor);
             }
@@ -498,7 +500,8 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
                 dataEntry.partitionId(),
                 dataEntry.partitionCounter(),
                 coCtx,
-                keepBinary);
+                keepBinary,
+                dataEntry.primary());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index bc5f9e3..6ad08dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecordV2;
 import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
@@ -121,7 +122,9 @@ import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V3;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD_V2;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD_V2;
@@ -406,7 +409,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
             case PARTITION_DESTROY:
                 return /*cacheId*/4 + /*partId*/4;
 
-            case DATA_RECORD:
+            case DATA_RECORD_V2:
                 DataRecord dataRec = (DataRecord)record;
 
                 return 4 + dataSize(dataRec);
@@ -667,12 +670,13 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case DATA_RECORD:
+            case DATA_RECORD_V2:
                 int entryCnt = in.readInt();
 
                 List<DataEntry> entries = new ArrayList<>(entryCnt);
 
                 for (int i = 0; i < entryCnt; i++)
-                    entries.add(readPlainDataEntry(in));
+                    entries.add(readPlainDataEntry(in, type));
 
                 res = new DataRecord(entries, 0L);
 
@@ -680,12 +684,13 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
             case ENCRYPTED_DATA_RECORD:
             case ENCRYPTED_DATA_RECORD_V2:
+            case ENCRYPTED_DATA_RECORD_V3:
                 entryCnt = in.readInt();
 
                 entries = new ArrayList<>(entryCnt);
 
                 for (int i = 0; i < entryCnt; i++)
-                    entries.add(readEncryptedDataEntry(in, type == ENCRYPTED_DATA_RECORD_V2));
+                    entries.add(readEncryptedDataEntry(in, type));
 
                 res = new DataRecord(entries, 0L);
 
@@ -1352,6 +1357,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 break;
 
             case DATA_RECORD:
+            case DATA_RECORD_V2:
                 DataRecord dataRec = (DataRecord)rec;
 
                 buf.putInt(dataRec.writeEntries().size());
@@ -1956,6 +1962,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
         buf.putInt(entry.partitionId());
         buf.putLong(entry.partitionCounter());
         buf.putLong(entry.expireTime());
+
+        if (!(entry instanceof MvccDataEntry))
+            buf.put(entry.primary() ? (byte)1 : 0);
     }
 
     /**
@@ -2002,14 +2011,16 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
     /**
      * @param in Input to read from.
-     * @param readKeyId If {@code true} encryption key identifier will be read from {@code in}.
+     * @param recType Record type.
      * @return Read entry.
      * @throws IOException If failed.
      * @throws IgniteCheckedException If failed.
      */
-    DataEntry readEncryptedDataEntry(ByteBufferBackedDataInput in, boolean readKeyId) throws IOException, IgniteCheckedException {
+    DataEntry readEncryptedDataEntry(ByteBufferBackedDataInput in, RecordType recType) throws IOException, IgniteCheckedException {
         boolean needDecryption = in.readByte() == ENCRYPTED;
 
+        RecordType dataRecordType = recType == ENCRYPTED_DATA_RECORD_V3 ? DATA_RECORD_V2 : DATA_RECORD;
+
         if (needDecryption) {
             if (encSpi == null) {
                 skipEncryptedRecord(in, false);
@@ -2017,22 +2028,23 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 return new EncryptedDataEntry();
             }
 
-            T3<ByteBufferBackedDataInput, Integer, RecordType> clData = readEncryptedData(in, false, readKeyId);
+            T3<ByteBufferBackedDataInput, Integer, RecordType> clData = readEncryptedData(in, false,
+                recType == ENCRYPTED_DATA_RECORD_V2 || recType == ENCRYPTED_DATA_RECORD_V3);
 
             if (clData.get1() == null)
                 return null;
 
-            return readPlainDataEntry(clData.get1());
+            return readPlainDataEntry(clData.get1(), dataRecordType);
         }
 
-        return readPlainDataEntry(in);
+        return readPlainDataEntry(in, dataRecordType);
     }
 
     /**
      * @param in Input to read from.
      * @return Read entry.
      */
-    DataEntry readPlainDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+    DataEntry readPlainDataEntry(ByteBufferBackedDataInput in, RecordType type) throws IOException, IgniteCheckedException {
         int cacheId = in.readInt();
 
         int keySize = in.readInt();
@@ -2061,6 +2073,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
         int partId = in.readInt();
         long partCntr = in.readLong();
         long expireTime = in.readLong();
+        boolean primary = type == DATA_RECORD_V2 && in.readByte() == (byte)1;
 
         GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
@@ -2083,7 +2096,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                     writeVer,
                     expireTime,
                     partId,
-                    partCntr
+                    partCntr,
+                    primary
             );
         }
         else
@@ -2099,7 +2113,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                     writeVer,
                     expireTime,
                     partId,
-                    partCntr);
+                    partCntr,
+                    primary
+            );
     }
 
     /**
@@ -2113,10 +2129,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
         if (needEncryption(rec))
             return ENCRYPTED_RECORD_V2;
 
-        if (rec.type() != DATA_RECORD)
+        if (rec.type() != DATA_RECORD && rec.type() != DATA_RECORD_V2)
             return rec.type();
 
-        return isDataRecordEncrypted((DataRecord)rec) ? ENCRYPTED_DATA_RECORD_V2 : DATA_RECORD;
+        return isDataRecordEncrypted((DataRecord)rec) ? ENCRYPTED_DATA_RECORD_V3 : rec.type();
     }
 
     /**
@@ -2235,7 +2251,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
             /*write ver*/CacheVersionIO.size(entry.writeVersion(), false) +
             /*part ID*/4 +
             /*expire Time*/8 +
-            /*part cnt*/8;
+            /*part cnt*/8 +
+            /*primary*/(entry instanceof MvccDataEntry ? 0 : 1);
     }
 
     /**
@@ -2268,7 +2285,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
     public static class EncryptedDataEntry extends DataEntry {
         /** Constructor. */
         EncryptedDataEntry() {
-            super(0, null, null, READ, null, null, 0, 0, 0);
+            super(0, null, null, READ, null, null, 0, 0, 0, false);
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 8622629..08f132a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -56,8 +56,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
-
 /**
  * Record data V2 serializer.
  */
@@ -96,6 +94,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
                 return 4/*entry count*/ + 8/*timestamp*/ + dataSize((DataRecord)rec);
 
             case DATA_RECORD:
+            case DATA_RECORD_V2:
                 return super.plainSize(rec) + 8/*timestamp*/;
 
             case SNAPSHOT:
@@ -160,13 +159,14 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
                 return cpRec;
 
             case DATA_RECORD:
+            case DATA_RECORD_V2:
                 int entryCnt = in.readInt();
                 long timeStamp = in.readLong();
 
                 List<DataEntry> entries = new ArrayList<>(entryCnt);
 
                 for (int i = 0; i < entryCnt; i++)
-                    entries.add(readPlainDataEntry(in));
+                    entries.add(readPlainDataEntry(in, type));
 
                 return new DataRecord(entries, timeStamp);
 
@@ -183,13 +183,14 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
 
             case ENCRYPTED_DATA_RECORD:
             case ENCRYPTED_DATA_RECORD_V2:
+            case ENCRYPTED_DATA_RECORD_V3:
                 entryCnt = in.readInt();
                 timeStamp = in.readLong();
 
                 entries = new ArrayList<>(entryCnt);
 
                 for (int i = 0; i < entryCnt; i++)
-                    entries.add(readEncryptedDataEntry(in, type == ENCRYPTED_DATA_RECORD_V2));
+                    entries.add(readEncryptedDataEntry(in, type));
 
                 return new DataRecord(entries, timeStamp);
 
@@ -261,6 +262,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
 
             case MVCC_DATA_RECORD:
             case DATA_RECORD:
+            case DATA_RECORD_V2:
                 DataRecord dataRec = (DataRecord)rec;
 
                 buf.putInt(dataRec.writeEntries().size());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7b6cd7f..208a9a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -861,7 +861,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             writeVersion(),
                                             0,
                                             txEntry.key().partition(),
-                                            txEntry.updateCounter())));
+                                            txEntry.updateCounter(),
+                                            cacheCtx.affinity().primaryByPartition(cctx.localNode(), txEntry.key().partition(), topVer))));
                                     }
 
                                     ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
diff --git a/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java b/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java
index 17ff241..e0f415d 100644
--- a/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java
@@ -82,7 +82,8 @@ public class TestStorageUtils {
                 ver,
                 0L,
                 partId,
-                updateCntr
+                updateCntr,
+                false
             );
 
             IgniteCacheDatabaseSharedManager db = ctx.shared().database();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java
index d82d263..55b6b61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java
@@ -158,7 +158,7 @@ public class IgnitePdsSporadicDataRecordsOnBackupTest extends GridCommonAbstract
 
         params.bufferSize(1024 * 1024);
         params.filesOrDirs(walDir, walArchiveDir);
-        params.filter((type, pointer) -> type == WALRecord.RecordType.DATA_RECORD);
+        params.filter((type, pointer) -> type == WALRecord.RecordType.DATA_RECORD_V2);
 
         int cacheId = CU.cacheId(TX_CACHE_NAME);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index f829b90..a8dd46c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -358,7 +358,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
                     cctx.affinity().partition(i), i, new MvccVersionImpl(1000L, 10L, i + 1 /* Non-zero */)) :
                 new DataEntry(cctx.cacheId(), key, val, op, null, cctx.cache().nextVersion(),
                     0L,
-                    cctx.affinity().partition(i), i));
+                    cctx.affinity().partition(i), i, false));
         }
 
         UUID cpId = UUID.randomUUID();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 7567705..422cc23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -763,7 +763,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
                     new GridCacheVersion(0, 1, 1, 0),
                     0,
                     0,
-                    0
+                    0,
+                    false
                 )));
 
                 File walDir = U.field(walMgr, "walWorkDir");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index 005d491..5c6c7c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -91,7 +91,7 @@ import static java.util.Arrays.fill;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MVCC_DATA_RECORD;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
@@ -132,6 +132,12 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
     /** Whether to enable WAL archive compaction. */
     private boolean enableWalCompaction;
 
+    /** Backup count. */
+    private int backupCnt;
+
+    /** DataEntry from primary flag. */
+    private boolean primary = true;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -142,6 +148,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
         ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
         ccfg.setIndexedTypes(Integer.class, IndexedObject.class);
+        ccfg.setBackups(backupCnt);
 
         cfg.setCacheConfiguration(ccfg);
 
@@ -266,13 +273,16 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
                 WALRecord walRecord = tup.get2();
 
-                if (walRecord.type() == DATA_RECORD || walRecord.type() == MVCC_DATA_RECORD) {
+                if (walRecord.type() == DATA_RECORD_V2 || walRecord.type() == MVCC_DATA_RECORD) {
                     DataRecord record = (DataRecord)walRecord;
 
                     for (DataEntry entry : record.writeEntries()) {
                         KeyCacheObject key = entry.key();
                         CacheObject val = entry.value();
 
+                        if (walRecord.type() == DATA_RECORD_V2)
+                            assertEquals(primary, entry.primary());
+
                         if (DUMP_RECORDS)
                             log.info("Op: " + entry.op() + ", Key: " + key + ", Value: " + val);
                     }
@@ -1046,6 +1056,87 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     @Test
+    public void testPrimaryFlagOnTwoNodes() throws Exception {
+        backupCnt = 1;
+
+        IgniteEx ignite = startGrid("node0");
+        Ignite ignite1 = startGrid(1);
+
+        ignite.cluster().state(ACTIVE);
+
+        IgniteCache<Integer, IndexedObject> cache = ignite.cache(CACHE_NAME);
+
+        backupCnt = 0;
+
+        int cntEntries = 100;
+
+        List<Integer> keys = findKeys(ignite.localNode(), cache, cntEntries, 0, 0);
+
+        Map<Integer, IndexedObject> map = new TreeMap<>();
+
+        for (Integer key : keys)
+            map.putIfAbsent(key, new IndexedObject(key));
+
+        cache.putAll(map);
+
+        ignite.cluster().active(false);
+
+        String subfolderName1 = genDbSubfolderName(ignite, 0);
+        String subfolderName2 = genDbSubfolderName(ignite1, 1);
+
+        stopAllGrids();
+
+        String workDir = U.defaultWorkDirectory();
+
+        IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+        Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class);
+
+        IgniteInClosure<DataRecord> drHnd = dataRecord -> {
+            List<? extends DataEntry> entries = dataRecord.writeEntries();
+
+            for (DataEntry entry : entries) {
+                GridCacheOperation op = entry.op();
+                Integer cnt = operationsFound.get(op);
+
+                operationsFound.put(op, cnt == null ? 1 : (cnt + 1));
+            }
+        };
+
+        scanIterateAndCount(
+            factory,
+            createIteratorParametersBuilder(workDir, subfolderName1)
+                .filesOrDirs(
+                    workDir + "/db/wal/" + subfolderName1,
+                    workDir + "/db/wal/archive/" + subfolderName1
+                ),
+            1,
+            1,
+            null, drHnd
+        );
+
+        primary = false;
+
+        scanIterateAndCount(
+            factory,
+            createIteratorParametersBuilder(workDir, subfolderName2)
+                .filesOrDirs(
+                    workDir + "/db/wal/" + subfolderName2,
+                    workDir + "/db/wal/archive/" + subfolderName2
+                ),
+            1,
+            1,
+            null,
+            drHnd
+        );
+    }
+
+    /**
+     * Tests transaction generation and WAL for putAll cache operation.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
     public void testPutAllTxIntoTwoNodes() throws Exception {
         Ignite ignite = startGrid("node0");
         Ignite ignite1 = startGrid(1);
@@ -1397,7 +1488,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
                 //noinspection EnumSwitchStatementWhichMissesCases
                 switch (type) {
-                    case DATA_RECORD:
+                    case DATA_RECORD_V2:
                         // Fallthrough.
                     case MVCC_DATA_RECORD: {
                         assert walRecord instanceof DataRecord;
@@ -1410,6 +1501,9 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
                         List<DataEntry> entries = dataRecord.writeEntries();
 
                         for (DataEntry entry : entries) {
+                            if (walRecord.type() == DATA_RECORD_V2)
+                                assertEquals(primary, entry.primary());
+
                             GridCacheVersion globalTxId = entry.nearXidVersion();
 
                             Object unwrappedKeyObj;
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 27aa08a..41bf1ef 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -112,8 +112,10 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_PAGE_UPDATE_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V3;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD_V2;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.EXCHANGE;
@@ -170,6 +172,7 @@ public class RecordUtils {
             put(TX_RECORD, RecordUtils::buildTxRecord);
             put(PAGE_RECORD, RecordUtils::buildPageSnapshot);
             put(DATA_RECORD, RecordUtils::buildDataRecord);
+            put(DATA_RECORD_V2, RecordUtils::buildDataRecord);
             put(CHECKPOINT_RECORD, RecordUtils::buildCheckpointRecord);
             put(HEADER_RECORD, RecordUtils::buildHeaderRecord);
             put(INIT_NEW_PAGE_RECORD, RecordUtils::buildInitNewPageRecord);
@@ -231,6 +234,7 @@ public class RecordUtils {
             put(ENCRYPTED_DATA_RECORD, RecordUtils::buildEncryptedDataRecord);
             put(ENCRYPTED_RECORD_V2, RecordUtils::buildEncryptedRecordV2);
             put(ENCRYPTED_DATA_RECORD_V2, RecordUtils::buildEncryptedDataRecordV2);
+            put(ENCRYPTED_DATA_RECORD_V3, RecordUtils::buildEncryptedDataRecordV3);
             put(MVCC_DATA_RECORD, RecordUtils::buildMvccDataRecord);
             put(MVCC_TX_RECORD, RecordUtils::buildMvccTxRecord);
             put(CONSISTENT_CUT, RecordUtils::buildConsistentCutRecord);
@@ -594,6 +598,11 @@ public class RecordUtils {
     }
 
     /** **/
+    public static UnsupportedWalRecord buildEncryptedDataRecordV3() {
+        return new UnsupportedWalRecord(ENCRYPTED_DATA_RECORD_V3);
+    }
+
+    /** **/
     public static MvccDataRecord buildMvccDataRecord() {
         return new MvccDataRecord(Collections.emptyList(), 1);
     }
diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java
index 49edc6c..25e404f 100644
--- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java
+++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java
@@ -65,7 +65,8 @@ class DataEntryWrapper extends DataEntry {
             dataEntry.writeVersion(),
             dataEntry.expireTime(),
             dataEntry.partitionId(),
-            dataEntry.partitionCounter()
+            dataEntry.partitionCounter(),
+            dataEntry.primary()
         );
 
         this.source = dataEntry;
diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
index 993ae88..cbed361 100644
--- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
+++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
@@ -127,7 +127,7 @@ public class WalStat {
 
         if (type == WALRecord.RecordType.PAGE_RECORD)
             registerPageSnapshot((PageSnapshot)record);
-        else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.MVCC_DATA_RECORD)
+        else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.DATA_RECORD_V2 || type == WALRecord.RecordType.MVCC_DATA_RECORD)
             registerDataRecord((DataRecord)record);
         else if (type == WALRecord.RecordType.TX_RECORD || type == WALRecord.RecordType.MVCC_TX_RECORD)
             registerTxRecord((TxRecord)record);
diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java
index b5c6976..928706f 100644
--- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java
+++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java
@@ -391,7 +391,7 @@ public class IgniteWalConverterArgumentsTest extends GridCommonAbstractTest {
             "binaryMetadataFileStoreDir=" + binaryMetadataDir.getAbsolutePath(),
             "marshallerMappingFileStoreDir=" + marshallerDir.getAbsolutePath(),
             "keepBinary=false",
-            "recordTypes=DATA_RECORD,TX_RECORD",
+            "recordTypes=DATA_RECORD_V2,TX_RECORD",
             "walTimeFromMillis=1575158400000",
             "walTimeToMillis=1577836740999",
             "recordContainsText=search string",
@@ -406,7 +406,7 @@ public class IgniteWalConverterArgumentsTest extends GridCommonAbstractTest {
         Assert.assertEquals(binaryMetadataDir, parseArgs.getBinaryMetadataFileStoreDir());
         Assert.assertEquals(marshallerDir, parseArgs.getMarshallerMappingFileStoreDir());
         Assert.assertFalse(parseArgs.isKeepBinary());
-        Assert.assertTrue(parseArgs.getRecordTypes().contains(WALRecord.RecordType.DATA_RECORD));
+        Assert.assertTrue(parseArgs.getRecordTypes().contains(WALRecord.RecordType.DATA_RECORD_V2));
         Assert.assertTrue(parseArgs.getRecordTypes().contains(WALRecord.RecordType.TX_RECORD));
         Assert.assertEquals(1575158400000L, (long)parseArgs.getFromTime());
         Assert.assertEquals(1577836740999L, (long)parseArgs.getToTime());
diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java
index bb4cab9..487f791 100644
--- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java
+++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java
@@ -298,7 +298,8 @@ public class IgniteWalConverterSensitiveDataTest extends GridCommonAbstractTest
             new GridCacheVersion(),
             0,
             0,
-            0
+            0,
+            false
         );
 
         byte[] sensitiveDataBytes = SENSITIVE_DATA_VALUE_PREFIX.getBytes(StandardCharsets.UTF_8);
diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java
index 896dc67..063e08f 100644
--- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java
+++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java
@@ -316,7 +316,7 @@ public class IgniteWalConverterTest extends GridCommonAbstractTest {
 
                     final int len = Integer.reverseBytes(raf.readInt());
 
-                    if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD.index()) {
+                    if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD_V2.index()) {
                         int i = 0;
 
                         int b;
@@ -426,7 +426,7 @@ public class IgniteWalConverterTest extends GridCommonAbstractTest {
                 if (recordTypeIndex > 0) {
                     recordTypeIndex--;
 
-                    if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD.index()) {
+                    if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD_V2.index()) {
                         find++;
 
                         if (find == 2) {