You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/08/04 13:08:02 UTC

[ignite-3] branch main updated: IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cdddfae611 IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408)
cdddfae611 is described below

commit cdddfae611f9e69d67743dcf5d3ca068a94f7479
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Aug 4 16:07:57 2023 +0300

    IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408)
---
 .idea/codeStyles/Project.xml                       |  3 +-
 .../apache/ignite/internal/schema/BinaryRow.java   | 16 -----
 .../ignite/internal/schema/BinaryRowImpl.java      | 11 +--
 .../ignite/internal/schema/ByteBufferRow.java      | 84 ----------------------
 .../org/apache/ignite/internal/schema/row/Row.java |  5 --
 .../sql/engine/exec/UpdatableTableImpl.java        | 53 +++++++-------
 .../exec/rel/TableScanNodeExecutionTest.java       |  3 +-
 .../AbstractMvPartitionStorageConcurrencyTest.java | 16 ++---
 .../internal/storage/BaseMvStoragesTest.java       |  5 +-
 .../storage/rocksdb/PartitionDataHelper.java       |  6 +-
 .../ignite/distributed/ItTablePersistenceTest.java | 21 +++---
 .../ignite/distributed/ReplicaUnavailableTest.java | 27 ++++---
 .../ignite/internal/table/ItColocationTest.java    | 13 ++--
 .../table/distributed/StorageUpdateHandler.java    | 16 ++---
 .../table/distributed/TableMessageGroup.java       |  6 ++
 .../distributed/command/UpdateAllCommand.java      |  4 +-
 .../table/distributed/command/UpdateCommand.java   | 13 +++-
 .../table/distributed/raft/PartitionListener.java  |  2 +-
 .../snapshot/incoming/IncomingSnapshotCopier.java  |  7 +-
 .../snapshot/message/SnapshotMvDataResponse.java   |  4 +-
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   | 24 ++++---
 ...owReplicaRequest.java => BinaryRowMessage.java} | 24 +++----
 .../request/MultipleRowReplicaRequest.java         | 12 ++--
 .../request/SingleRowReplicaRequest.java           |  6 +-
 .../replication/request/SwapRowReplicaRequest.java | 10 ++-
 .../replicator/PartitionReplicaListener.java       | 80 ++++++++++++---------
 .../distributed/storage/InternalTableImpl.java     | 79 ++++++++++----------
 .../internal/table/distributed/IndexBaseTest.java  | 32 ++++-----
 .../PartitionRaftCommandsSerializationTest.java    | 41 ++++++-----
 .../raft/PartitionCommandListenerTest.java         | 40 ++++++-----
 .../incoming/IncomingSnapshotCopierTest.java       | 11 ++-
 .../OutgoingSnapshotMvDataStreamingTest.java       | 79 +++++++++++---------
 .../PartitionReplicaListenerIndexLockingTest.java  | 12 +++-
 .../replication/PartitionReplicaListenerTest.java  | 61 ++++++++--------
 34 files changed, 387 insertions(+), 439 deletions(-)

diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml
index 51cbc4108f..6f8eb3a91e 100644
--- a/.idea/codeStyles/Project.xml
+++ b/.idea/codeStyles/Project.xml
@@ -166,6 +166,7 @@
       <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
       <option name="TERNARY_OPERATION_WRAP" value="1" />
       <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+      <option name="KEEP_SIMPLE_LAMBDAS_IN_ONE_LINE" value="true" />
       <option name="FOR_STATEMENT_WRAP" value="1" />
       <option name="ARRAY_INITIALIZER_WRAP" value="1" />
       <option name="WRAP_COMMENTS" value="true" />
@@ -589,4 +590,4 @@
       </indentOptions>
     </codeStyleSettings>
   </code_scheme>
-</component>
+</component>
\ No newline at end of file
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index a519a48a23..336a1abcfd 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -18,23 +18,11 @@
 package org.apache.ignite.internal.schema;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 
 /**
  * Binary row interface. Data layout is described in packages' {@code README.md}.
  */
 public interface BinaryRow {
-    ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
-
-    /** Size of schema version field. */
-    int SCHEMA_VERSION_FLD_LEN = Short.BYTES;
-
-    /** Row schema version field offset. */
-    int SCHEMA_VERSION_OFFSET = 0;
-
-    /** Row binary tuple field offset. */
-    int TUPLE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_FLD_LEN;
-
     /** Get row schema version. */
     int schemaVersion();
 
@@ -46,8 +34,4 @@ public interface BinaryRow {
 
     /** Get ByteBuffer slice representing the binary tuple. */
     ByteBuffer tupleSlice();
-
-    /** Returns the representation of this row as a Byte Buffer. */
-    // TODO: remove this method, see https://issues.apache.org/jira/browse/IGNITE-19937
-    ByteBuffer byteBuffer();
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
index c7d41c452b..ee40a0a45f 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
@@ -52,16 +52,7 @@ public class BinaryRowImpl implements BinaryRow {
 
     @Override
     public ByteBuffer tupleSlice() {
-        return binaryTuple.duplicate().order(ORDER);
-    }
-
-    @Override
-    public ByteBuffer byteBuffer() {
-        return ByteBuffer.allocate(tupleSliceLength() + Short.BYTES)
-                .order(ORDER)
-                .putShort((short) schemaVersion())
-                .put(tupleSlice())
-                .rewind();
+        return binaryTuple.duplicate().order(BinaryTuple.ORDER);
     }
 
     @Override
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
deleted file mode 100644
index 2eb9ab6594..0000000000
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema;
-
-import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG;
-
-import java.nio.ByteBuffer;
-
-/**
- * Heap byte buffer-based row.
- */
-// TODO: remove this class, see https://issues.apache.org/jira/browse/IGNITE-19937
-public class ByteBufferRow implements BinaryRow {
-    /** Row buffer. */
-    private final ByteBuffer buf;
-
-    /**
-     * Constructor.
-     *
-     * @param data Array representation of the row.
-     */
-    public ByteBufferRow(byte[] data) {
-        this(ByteBuffer.wrap(data).order(ORDER));
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param buf Buffer representing the row.
-     */
-    public ByteBufferRow(ByteBuffer buf) {
-        assert buf.order() == ORDER;
-        assert buf.position() == 0;
-
-        this.buf = buf;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int schemaVersion() {
-        return Short.toUnsignedInt(buf.getShort(SCHEMA_VERSION_OFFSET));
-    }
-
-    @Override
-    public boolean hasValue() {
-        return (buf.get(TUPLE_OFFSET) & ROW_HAS_VALUE_FLAG) != 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public ByteBuffer tupleSlice() {
-        try {
-            return buf.position(TUPLE_OFFSET).slice().order(ORDER);
-        } finally {
-            buf.position(0); // Reset bounds.
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public ByteBuffer byteBuffer() {
-        return buf.duplicate().order(ORDER);
-    }
-
-    @Override
-    public int tupleSliceLength() {
-        return buf.remaining() - TUPLE_OFFSET;
-    }
-}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index e3030422af..f8562916b9 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -113,11 +113,6 @@ public class Row extends BinaryTupleReader implements BinaryRowEx, SchemaAware,
         return row.tupleSlice();
     }
 
-    @Override
-    public ByteBuffer byteBuffer() {
-        return row.byteBuffer();
-    }
-
     @Override
     public int tupleSliceLength() {
         return row.tupleSliceLength();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 310423aca4..2d15526ef8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -23,7 +23,6 @@ import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -47,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.SqlException;
@@ -78,7 +78,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
     /**
      * Constructor.
      *
-     * @param desc  Table descriptor.
+     * @param desc Table descriptor.
      */
     public UpdatableTableImpl(
             int tableId,
@@ -137,7 +137,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
             ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     .commitPartitionId(commitPartitionId)
-                    .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+                    .binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_UPSERT_ALL)
@@ -150,11 +150,16 @@ public final class UpdatableTableImpl implements UpdatableTable {
         return CompletableFuture.allOf(futures);
     }
 
-    private static List<ByteBuffer> serializeBinaryRows(Collection<BinaryRow> rows) {
-        var result = new ArrayList<ByteBuffer>(rows.size());
+    private static List<BinaryRowMessage> serializeBinaryRows(Collection<BinaryRow> rows) {
+        var result = new ArrayList<BinaryRowMessage>(rows.size());
 
         for (BinaryRow row : rows) {
-            result.add(row.byteBuffer());
+            BinaryRowMessage message = MESSAGES_FACTORY.binaryRowMessage()
+                    .binaryTuple(row.tupleSlice())
+                    .schemaVersion(row.schemaVersion())
+                    .build();
+
+            result.add(message);
         }
 
         return result;
@@ -197,7 +202,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
             ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     .commitPartitionId(commitPartitionId)
-                    .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+                    .binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_INSERT_ALL)
@@ -205,26 +210,26 @@ public final class UpdatableTableImpl implements UpdatableTable {
                     .build();
 
             futures[batchNum++] = replicaService.invoke(nodeWithTerm.name(), request)
-                .thenApply(result -> {
-                    Collection<BinaryRow> binaryRows = (Collection<BinaryRow>) result;
+                    .thenApply(result -> {
+                        Collection<BinaryRow> binaryRows = (Collection<BinaryRow>) result;
 
-                    if (binaryRows.isEmpty()) {
-                        return List.of();
-                    }
+                        if (binaryRows.isEmpty()) {
+                            return List.of();
+                        }
 
-                    List<RowT> conflictRows = new ArrayList<>(binaryRows.size());
-                    IgniteTypeFactory typeFactory = ectx.getTypeFactory();
-                    RowHandler.RowFactory<RowT> rowFactory = handler.factory(
-                            ectx.getTypeFactory(),
-                            desc.insertRowType(typeFactory)
-                    );
+                        List<RowT> conflictRows = new ArrayList<>(binaryRows.size());
+                        IgniteTypeFactory typeFactory = ectx.getTypeFactory();
+                        RowHandler.RowFactory<RowT> rowFactory = handler.factory(
+                                ectx.getTypeFactory(),
+                                desc.insertRowType(typeFactory)
+                        );
 
-                    for (BinaryRow row : binaryRows) {
-                        conflictRows.add(rowConverter.toRow(ectx, row, rowFactory, null));
-                    }
+                        for (BinaryRow row : binaryRows) {
+                            conflictRows.add(rowConverter.toRow(ectx, row, rowFactory, null));
+                        }
 
-                    return conflictRows;
-                });
+                        return conflictRows;
+                    });
         }
 
         return handleInsertResults(handler, futures);
@@ -260,7 +265,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
             ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(partGroupId)
                     .commitPartitionId(commitPartitionId)
-                    .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+                    .binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
                     .transactionId(txAttributes.id())
                     .term(nodeWithTerm.term())
                     .requestType(RequestType.RW_DELETE_ALL)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index da376fd68c..eca7bca58a 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl;
@@ -133,7 +132,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest {
 
         private final int dataAmount;
 
-        private final ByteBufferRow bbRow = new ByteBufferRow(new byte[1]);
+        private final BinaryRow bbRow = mock(BinaryRow.class);
 
         private final CopyOnWriteArraySet<Integer> partitions = new CopyOnWriteArraySet<>();
 
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 03051669e1..53c9cc25e8 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.storage;
 
-import static java.util.stream.Collectors.toCollection;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -25,10 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.jetbrains.annotations.Nullable;
@@ -203,13 +200,14 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
 
             addAndCommit.perform(this, null);
 
-            Collection<ByteBuffer> rows = Stream.of(TABLE_ROW, TABLE_ROW2)
-                    .map(BinaryRow::byteBuffer)
-                    .collect(toCollection(ConcurrentLinkedQueue::new));
+            Collection<BinaryRow> rows = new ConcurrentLinkedQueue<>();
+
+            rows.add(TABLE_ROW);
+            rows.add(TABLE_ROW2);
 
             runRace(
-                    () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(), rows),
-                    () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(), rows)
+                    () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow(), rows),
+                    () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow(), rows)
             );
 
             assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
@@ -220,7 +218,7 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
         }
     }
 
-    private void assertRemoveRow(ByteBuffer rowBytes, Collection<ByteBuffer> rows) {
+    private static void assertRemoveRow(@Nullable BinaryRow rowBytes, Collection<BinaryRow> rows) {
         assertNotNull(rowBytes);
 
         assertTrue(rows.remove(rowBytes), rowBytes.toString());
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 9a751332e5..b879bdfdb3 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -77,7 +78,9 @@ public abstract class BaseMvStoragesTest {
 
     protected static BinaryRow binaryRow(TestKey key, TestValue value) {
         try {
-            return kvMarshaller.marshal(key, value);
+            Row row = kvMarshaller.marshal(key, value);
+
+            return new BinaryRowImpl(row.schemaVersion(), row.tupleSlice());
         } catch (MarshallerException e) {
             throw new IgniteException(e);
         }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index 9ea674b5e8..6c24c08689 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -36,7 +36,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowImpl;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RowId;
@@ -77,8 +77,6 @@ public final class PartitionDataHelper implements ManuallyCloseable {
     /** Value offset (if transaction state is present). */
     static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
 
-    static final ByteOrder TABLE_ROW_BYTE_ORDER = ByteBufferRow.ORDER;
-
     /** Thread-local direct buffer instance to read keys from RocksDB. */
     static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
 
@@ -260,7 +258,7 @@ public final class PartitionDataHelper implements ManuallyCloseable {
         assert buffer.order() == ByteOrder.BIG_ENDIAN;
 
         int schemaVersion = Short.toUnsignedInt(buffer.getShort());
-        ByteBuffer binaryTupleSlice = buffer.slice().order(TABLE_ROW_BYTE_ORDER);
+        ByteBuffer binaryTupleSlice = buffer.slice().order(BinaryTuple.ORDER);
 
         return new BinaryRowImpl(schemaVersion, binaryTupleSlice);
     }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index f3c99a37ab..86ca7097e1 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -226,8 +227,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
                     int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
                     MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex);
 
-                    Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(partitionStorage);
-                    RowId rowId = primaryIndex.get(req0.binaryRowBytes());
+                    Map<BinaryRow, RowId> primaryIndex = rowsToRowIds(partitionStorage);
+                    RowId rowId = primaryIndex.get(req0.binaryRow());
 
                     BinaryRow row = partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
 
@@ -235,13 +236,13 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
                 }
 
                 // Non-null binary row if UPSERT, otherwise it's implied that request type is DELETE.
-                ByteBuffer binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRowBytes() : null;
+                BinaryRowMessage binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRowMessage() : null;
 
                 UpdateCommand cmd = msgFactory.updateCommand()
                         .txId(req0.transactionId())
                         .tablePartitionId(tablePartitionId(new TablePartitionId(1, 0)))
                         .rowUuid(new RowId(0).uuid())
-                        .rowBuffer(binaryRow)
+                        .rowMessage(binaryRow)
                         .safeTimeLong(hybridClock.nowLong())
                         .build();
 
@@ -303,11 +304,11 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
         MvPartitionStorage storage = getListener(restarted, raftGroupId()).getMvStorage();
 
         return () -> {
-            Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(storage);
+            Map<BinaryRow, RowId> primaryIndex = rowsToRowIds(storage);
 
             Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
 
-            RowId rowId = primaryIndex.get(value.byteBuffer());
+            RowId rowId = primaryIndex.get(new BinaryRowImpl(value.schemaVersion(), value.tupleSlice()));
 
             if (rowId == null) {
                 return false;
@@ -323,8 +324,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
         };
     }
 
-    private static Map<ByteBuffer, RowId> rowsToRowIds(MvPartitionStorage storage) {
-        Map<ByteBuffer, RowId> result = new HashMap<>();
+    private static Map<BinaryRow, RowId> rowsToRowIds(MvPartitionStorage storage) {
+        Map<BinaryRow, RowId> result = new HashMap<>();
 
         RowId rowId = storage.closestRowId(RowId.lowestRowId(0));
 
@@ -332,7 +333,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
             BinaryRow binaryRow = storage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
 
             if (binaryRow != null) {
-                result.put(binaryRow.byteBuffer(), rowId);
+                result.put(binaryRow, rowId);
             }
 
             RowId incremented = rowId.increment();
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 51a589ae8d..e7dab29686 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -28,11 +28,11 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
 import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -51,12 +51,14 @@ import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutExcepti
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -149,7 +151,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId)
                 .timestampLong(clock.nowLong())
-                .binaryRowBytes(createKeyValueRow(1L, 1L))
+                .binaryRowMessage(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
 
@@ -191,7 +193,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId)
                 .timestampLong(clock.nowLong())
-                .binaryRowBytes(createKeyValueRow(1L, 1L))
+                .binaryRowMessage(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
 
@@ -223,7 +225,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId)
                 .timestampLong(clock.nowLong())
-                .binaryRowBytes(createKeyValueRow(1L, 1L))
+                .binaryRowMessage(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
 
@@ -242,10 +244,10 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
             e1 = e;
         }
 
-        assertTrue(e0 != null);
+        assertNotNull(e0);
         assertTrue(unwrapCause(e0) instanceof ReplicationException, e0.toString());
 
-        assertTrue(e1 != null);
+        assertNotNull(e1);
         assertTrue(unwrapCause(e1) instanceof ReplicationException, e1.toString());
     }
 
@@ -280,7 +282,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId)
                 .timestampLong(clock.nowLong())
-                .binaryRowBytes(createKeyValueRow(1L, 1L))
+                .binaryRowMessage(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
 
@@ -292,17 +294,22 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
             e0 = e;
         }
 
-        assertTrue(e0 != null);
+        assertNotNull(e0);
         assertTrue(unwrapCause(e0) instanceof ReplicationTimeoutException, e0.toString());
         assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException) unwrapCause(e0)).code());
     }
 
-    private static ByteBuffer createKeyValueRow(long id, long value) {
+    private BinaryRowMessage createKeyValueRow(long id, long value) {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA);
 
         rowBuilder.appendLong(id);
         rowBuilder.appendLong(value);
 
-        return rowBuilder.build().byteBuffer();
+        BinaryRow row = rowBuilder.build();
+
+        return tableMessagesFactory.binaryRowMessage()
+                .binaryTuple(row.tupleSlice())
+                .schemaVersion(row.schemaVersion())
+                .build();
     }
 }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index b412c75a3f..0f5fa3bf5d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -35,7 +35,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -64,7 +63,6 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
@@ -78,6 +76,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -197,7 +196,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
             RaftGroupService r = groupRafts.get(request.groupId());
 
             if (request instanceof ReadWriteMultiRowReplicaRequest) {
-                Map<UUID, ByteBuffer> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRowsBytes()
+                Map<UUID, BinaryRowMessage> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRowMessages()
                         .stream()
                         .collect(toMap(row -> TestTransactionIds.newTransactionId(), Function.identity()));
 
@@ -221,7 +220,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
                                         .build()
                         )
                         .rowUuid(UUID.randomUUID())
-                        .rowBuffer(((ReadWriteSingleRowReplicaRequest) request).binaryRow().byteBuffer())
+                        .rowMessage(((ReadWriteSingleRowReplicaRequest) request).binaryRowMessage())
                         .txId(TestTransactionIds.newTransactionId())
                         .build());
             }
@@ -394,14 +393,14 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
             partsMap.merge(part, 1, (cnt, ignore) -> ++cnt);
         }
 
-        assertEquals(partsMap.size(), CMDS_MAP.size());
+        assertEquals(CMDS_MAP.size(), partsMap.size());
 
         CMDS_MAP.forEach((p, set) -> {
             UpdateAllCommand cmd = (UpdateAllCommand) CollectionUtils.first(set);
             assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> "part=" + p + ", set=" + set);
 
-            cmd.rowsToUpdate().values().forEach(byteBuffer -> {
-                Row r = new Row(schema, new ByteBufferRow(byteBuffer));
+            cmd.rowsToUpdate().values().forEach(rowMessage -> {
+                Row r = new Row(schema, rowMessage.asBinaryRow());
 
                 assertEquals(INT_TABLE.partition(r), p);
             });
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 110be7852c..146ff6620a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.table.distributed;
 
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -32,13 +31,13 @@ import java.util.function.Consumer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
@@ -102,20 +101,19 @@ public class StorageUpdateHandler {
      * @param txId Transaction id.
      * @param rowUuid Row UUID.
      * @param commitPartitionId Commit partition id.
-     * @param rowBuffer Row buffer.
+     * @param row Row.
      * @param onApplication Callback on application.
      */
     public void handleUpdate(
             UUID txId,
             UUID rowUuid,
             TablePartitionId commitPartitionId,
-            @Nullable ByteBuffer rowBuffer,
+            @Nullable BinaryRow row,
             @Nullable Consumer<RowId> onApplication
     ) {
         indexUpdateHandler.waitIndexes();
 
         storage.runConsistently(locker -> {
-            BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) : null;
             RowId rowId = new RowId(partitionId, rowUuid);
             int commitTblId = commitPartitionId.tableId();
             int commitPartId = commitPartitionId.partitionId();
@@ -151,7 +149,7 @@ public class StorageUpdateHandler {
      */
     public void handleUpdateAll(
             UUID txId,
-            Map<UUID, ByteBuffer> rowsToUpdate,
+            Map<UUID, BinaryRowMessage> rowsToUpdate,
             TablePartitionId commitPartitionId,
             @Nullable Consumer<Collection<RowId>> onReplication
     ) {
@@ -165,11 +163,11 @@ public class StorageUpdateHandler {
                 List<RowId> rowIds = new ArrayList<>();
 
                 // Sort IDs to prevent deadlock. Natural UUID order matches RowId order within the same partition.
-                SortedMap<UUID, ByteBuffer> sortedRowsToUpdateMap = new TreeMap<>(rowsToUpdate);
+                SortedMap<UUID, BinaryRowMessage> sortedRowsToUpdateMap = new TreeMap<>(rowsToUpdate);
 
-                for (Map.Entry<UUID, ByteBuffer> entry : sortedRowsToUpdateMap.entrySet()) {
+                for (Map.Entry<UUID, BinaryRowMessage> entry : sortedRowsToUpdateMap.entrySet()) {
                     RowId rowId = new RowId(partitionId, entry.getKey());
-                    BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
+                    BinaryRow row = entry.getValue() == null ? null : entry.getValue().asBinaryRow();
 
                     locker.lock(rowId);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 4ccb9ab6e9..667a58fde0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -143,6 +144,11 @@ public interface TableMessageGroup {
      */
     short BINARY_TUPLE = 17;
 
+    /**
+     * Message type for {@link BinaryRowMessage}.
+     */
+    short BINARY_ROW_MESSAGE = 18;
+
     /**
      * Message types for Table module RAFT commands.
      */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 9654deff74..4755ac0502 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -30,5 +30,5 @@ import org.apache.ignite.network.annotations.Transferable;
 public interface UpdateAllCommand extends PartitionCommand {
     TablePartitionIdMessage tablePartitionId();
 
-    Map<UUID, ByteBuffer> rowsToUpdate();
+    Map<UUID, BinaryRowMessage> rowsToUpdate();
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 358432d1d7..0515214a55 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -17,9 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.nio.ByteBuffer;
 import java.util.UUID;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
 
@@ -33,5 +34,13 @@ public interface UpdateCommand extends PartitionCommand {
     UUID rowUuid();
 
     @Nullable
-    ByteBuffer rowBuffer();
+    BinaryRowMessage rowMessage();
+
+    /** Returns the row to update or {@code null} if the row should be removed. */
+    @Nullable
+    default BinaryRow row() {
+        BinaryRowMessage message = rowMessage();
+
+        return message == null ? null : message.asBinaryRow();
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index b3746397c0..41c1ee0d3c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -231,7 +231,7 @@ public class PartitionListener implements RaftGroupListener {
             return;
         }
 
-        storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), cmd.rowBuffer(),
+        storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
                 rowId -> {
                     txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new TreeSet<>()).add(rowId);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 0dacd347ac..fd5a9a3c37 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -21,7 +21,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -31,7 +30,6 @@ import java.util.function.Function;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
@@ -46,6 +44,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
@@ -393,9 +392,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
     private void writeVersion(ResponseEntry entry, int i) {
         RowId rowId = new RowId(partId(), entry.rowId());
 
-        ByteBuffer rowVersion = entry.rowVersions().get(i);
+        BinaryRowMessage rowVersion = entry.rowVersions().get(i);
 
-        BinaryRow binaryRow = rowVersion == null ? null : new ByteBufferRow(rowVersion.rewind());
+        BinaryRow binaryRow = rowVersion == null ? null : rowVersion.asBinaryRow();
 
         PartitionAccess partition = partitionSnapshotStorage.partition();
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java
index 523b0f4834..ca115fdb24 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.message;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.table.TableRow;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
@@ -48,7 +48,7 @@ public interface SnapshotMvDataResponse extends NetworkMessage {
         UUID rowId();
 
         /** List of {@link TableRow}s for a given {@link #rowId()}. */
-        List<ByteBuffer> rowVersions();
+        List<BinaryRowMessage> rowVersions();
 
         /**
          * List of commit timestamps for all committed versions. Might be smaller than {@link #rowVersions()} if there's a write-intent
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index 5e947e32dc..82544f9f06 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -264,12 +264,13 @@ public class OutgoingSnapshot {
         return totalBytesAfter;
     }
 
-    private static long rowSizeInBytes(List<ByteBuffer> rowVersions) {
+    private static long rowSizeInBytes(List<BinaryRowMessage> rowVersions) {
         long sum = 0;
 
-        for (ByteBuffer buf : rowVersions) {
-            if (buf != null) {
-                sum += buf.remaining();
+        for (BinaryRowMessage rowMessage : rowVersions) {
+            if (rowMessage != null) {
+                // Schema version is an unsigned short.
+                sum += rowMessage.binaryTuple().remaining() + Short.BYTES;
             }
         }
 
@@ -318,7 +319,7 @@ public class OutgoingSnapshot {
         }
 
         int count = rowVersionsN2O.size();
-        List<ByteBuffer> buffers = new ArrayList<>(count);
+        List<BinaryRowMessage> rowVersions = new ArrayList<>(count);
 
         int commitTimestampsCount = rowVersionsN2O.get(0).isWriteIntent() ? count - 1 : count;
         long[] commitTimestamps = new long[commitTimestampsCount];
@@ -331,7 +332,14 @@ public class OutgoingSnapshot {
             ReadResult version = rowVersionsN2O.get(i);
             BinaryRow row = version.binaryRow();
 
-            buffers.add(row == null ? null : row.byteBuffer());
+            BinaryRowMessage rowMessage = row == null
+                    ? null
+                    : MESSAGES_FACTORY.binaryRowMessage()
+                            .binaryTuple(row.tupleSlice())
+                            .schemaVersion(row.schemaVersion())
+                            .build();
+
+            rowVersions.add(rowMessage);
 
             if (version.isWriteIntent()) {
                 assert i == 0 : rowVersionsN2O;
@@ -346,7 +354,7 @@ public class OutgoingSnapshot {
 
         return MESSAGES_FACTORY.responseEntry()
                 .rowId(rowId.uuid())
-                .rowVersions(buffers)
+                .rowVersions(rowVersions)
                 .timestamps(commitTimestamps)
                 .txId(transactionId)
                 .commitTableId(commitTableId)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java
similarity index 63%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java
index 98a4656483..3bb8cd8057 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java
@@ -18,22 +18,22 @@
 package org.apache.ignite.internal.table.distributed.replication.request;
 
 import java.nio.ByteBuffer;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Single-row replica request.
+ * Message for transferring a {@link BinaryRow}.
  */
-public interface SingleRowReplicaRequest extends ReplicaRequest {
-    ByteBuffer binaryRowBytes();
+@Transferable(TableMessageGroup.BINARY_ROW_MESSAGE)
+public interface BinaryRowMessage extends NetworkMessage {
+    ByteBuffer binaryTuple();
 
-    default BinaryRow binaryRow() {
-        return new ByteBufferRow(binaryRowBytes());
-    }
+    int schemaVersion();
 
-    @Marshallable
-    RequestType requestType();
+    default BinaryRow asBinaryRow() {
+        return new BinaryRowImpl(schemaVersion(), binaryTuple());
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
index 5b7c39b720..415f73253f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
@@ -30,18 +28,18 @@ import org.apache.ignite.network.annotations.Marshallable;
  * Multiple row replica request.
  */
 public interface MultipleRowReplicaRequest extends ReplicaRequest {
-    Collection<ByteBuffer> binaryRowsBytes();
+    Collection<BinaryRowMessage> binaryRowMessages();
 
     /**
      * Deserializes binary row byte buffers into binary rows.
      */
     default Collection<BinaryRow> binaryRows() {
-        Collection<ByteBuffer> binaryRowsBytes = binaryRowsBytes();
+        Collection<BinaryRowMessage> binaryRowMessages = binaryRowMessages();
 
-        var result = new ArrayList<BinaryRow>(binaryRowsBytes.size());
+        var result = new ArrayList<BinaryRow>(binaryRowMessages.size());
 
-        for (ByteBuffer buffer : binaryRowsBytes) {
-            result.add(new ByteBufferRow(buffer));
+        for (BinaryRowMessage message : binaryRowMessages) {
+            result.add(message.asBinaryRow());
         }
 
         return result;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
index 98a4656483..a2f033cf0e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
@@ -28,10 +26,10 @@ import org.apache.ignite.network.annotations.Marshallable;
  * Single-row replica request.
  */
 public interface SingleRowReplicaRequest extends ReplicaRequest {
-    ByteBuffer binaryRowBytes();
+    BinaryRowMessage binaryRowMessage();
 
     default BinaryRow binaryRow() {
-        return new ByteBufferRow(binaryRowBytes());
+        return binaryRowMessage().asBinaryRow();
     }
 
     @Marshallable
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
index d088c702d3..1b5a82f6ec 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.network.annotations.Marshallable;
 
@@ -28,16 +26,16 @@ import org.apache.ignite.network.annotations.Marshallable;
  * Dual row replica request.
  */
 public interface SwapRowReplicaRequest extends ReplicaRequest {
-    ByteBuffer binaryRowBytes();
+    BinaryRowMessage binaryRowMessage();
 
     default BinaryRow binaryRow() {
-        return new ByteBufferRow(binaryRowBytes());
+        return binaryRowMessage().asBinaryRow();
     }
 
-    ByteBuffer oldBinaryRowBytes();
+    BinaryRowMessage oldBinaryRowMessage();
 
     default BinaryRow oldBinaryRow() {
-        return new ByteBufferRow(oldBinaryRowBytes());
+        return oldBinaryRowMessage().asBinaryRow();
     }
 
     @Marshallable
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 5f44953212..1979ef7730 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -112,6 +112,7 @@ import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
@@ -141,6 +142,7 @@ import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
@@ -924,13 +926,13 @@ public class PartitionReplicaListener implements ReplicaListener {
 
             return committedReadResult.binaryRow();
         })
-        .thenComposeAsync(resolvedReadResult -> {
-            if (resolvedReadResult != null && indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) {
-                result.add(resolvedReadResult);
-            }
+                .thenComposeAsync(resolvedReadResult -> {
+                    if (resolvedReadResult != null && indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) {
+                        result.add(resolvedReadResult);
+                    }
 
-            return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result);
-        }, scanRequestExecutor);
+                    return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result);
+                }, scanRequestExecutor);
     }
 
     /**
@@ -1404,9 +1406,9 @@ public class PartitionReplicaListener implements ReplicaListener {
                 checkWriteIntentsBelongSameTx(writeIntents);
 
                 return resolveTxState(
-                                new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()),
-                                writeIntent.transactionId(),
-                                ts)
+                        new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()),
+                        writeIntent.transactionId(),
+                        ts)
                         .thenApply(readLastCommitted -> {
                             if (readLastCommitted) {
                                 for (ReadResult wi : writeIntents) {
@@ -1468,7 +1470,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     /**
      * Tests row values for equality.
      *
-     * @param row  Row.
+     * @param row Row.
      * @param row2 Row.
      * @return {@code true} if rows are equal.
      */
@@ -1533,7 +1535,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 }
 
                 return allOf(rowIdLockFuts).thenCompose(ignore -> {
-                    Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>();
+                    Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
                     Collection<BinaryRow> result = new ArrayList<>();
 
                     int futNum = 0;
@@ -1572,7 +1574,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 }
 
                 return allOf(deleteExactLockFuts).thenCompose(ignore -> {
-                    Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>();
+                    Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
                     Collection<BinaryRow> result = new ArrayList<>();
 
                     int futNum = 0;
@@ -1642,10 +1644,14 @@ public class PartitionReplicaListener implements ReplicaListener {
                         insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
                     }
 
-                    Map<UUID, ByteBuffer> convertedMap = rowsToInsert.entrySet().stream().collect(
-                            Collectors.toMap(
+                    Map<UUID, BinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream()
+                            .collect(Collectors.toMap(
                                     e -> e.getKey().uuid(),
-                                    e -> e.getValue().byteBuffer()));
+                                    e -> MSG_FACTORY.binaryRowMessage()
+                                            .binaryTuple(e.getValue().tupleSlice())
+                                            .schemaVersion(e.getValue().schemaVersion())
+                                            .build()
+                            ));
 
                     return allOf(insertLockFuts)
                             .thenCompose(ignored -> applyUpdateAllCommand(
@@ -1679,14 +1685,14 @@ public class PartitionReplicaListener implements ReplicaListener {
                 }
 
                 return allOf(rowIdFuts).thenCompose(ignore -> {
-                    Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>();
+                    Map<UUID, BinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(request.binaryRowMessages().size());
 
                     int futNum = 0;
 
-                    for (BinaryRow row : request.binaryRows()) {
+                    for (BinaryRowMessage row : request.binaryRowMessages()) {
                         RowId lockedRow = rowIdFuts[futNum++].join().get1();
 
-                        rowsToUpdate.put(lockedRow.uuid(), row.byteBuffer());
+                        rowsToUpdate.put(lockedRow.uuid(), row);
                     }
 
                     if (rowsToUpdate.isEmpty()) {
@@ -1747,7 +1753,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 cmd.txId(),
                 cmd.rowUuid(),
                 cmd.tablePartitionId().asTablePartitionId(),
-                cmd.rowBuffer(),
+                cmd.row(),
                 rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
                     if (v == null) {
                         v = new TreeSet<>();
@@ -1963,7 +1969,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     /**
      * Takes all required locks on a key, before upserting.
      *
-     * @param txId      Transaction id.
+     * @param txId Transaction id.
      * @return Future completes with tuple {@link RowId} and collection of {@link Lock}.
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
@@ -2035,7 +2041,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     /**
      * Takes all required locks on a key, before deleting the value.
      *
-     * @param txId      Transaction id.
+     * @param txId Transaction id.
      * @return Future completes with {@link RowId} or {@code null} if there is no value for remove.
      */
     private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
@@ -2055,7 +2061,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     /**
      * Takes all required locks on a key, before deleting the value.
      *
-     * @param txId      Transaction id.
+     * @param txId Transaction id.
      * @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
      */
     private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow, RowId rowId, UUID txId) {
@@ -2068,7 +2074,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     /**
      * Takes all required locks on a key, before getting the value.
      *
-     * @param txId      Transaction id.
+     * @param txId Transaction id.
      * @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
      */
     private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
@@ -2124,7 +2130,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     /**
      * Takes all required locks on a key, before updating the value.
      *
-     * @param txId      Transaction id.
+     * @param txId Transaction id.
      * @return Future completes with tuple {@link RowId} and collection of {@link Lock} or {@code null} if there is no suitable row.
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow,
@@ -2284,8 +2290,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     }
 
     /**
-     * Resolves a read result to the matched row.
-     * If the result does not match any row, the method returns a future to {@code null}.
+     * Resolves a read result to the matched row. If the result does not match any row, the method returns a future to {@code null}.
      *
      * @param readResult Read result.
      * @param timestamp Timestamp.
@@ -2298,9 +2303,9 @@ public class PartitionReplicaListener implements ReplicaListener {
             Supplier<BinaryRow> lastCommitted
     ) {
         return resolveTxState(
-                        new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()),
-                        readResult.transactionId(),
-                        timestamp)
+                new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()),
+                readResult.transactionId(),
+                timestamp)
                 .thenApply(readLastCommitted -> {
                     if (readLastCommitted) {
                         return lastCommitted.get();
@@ -2371,7 +2376,12 @@ public class PartitionReplicaListener implements ReplicaListener {
                 .safeTimeLong(hybridClock.nowLong());
 
         if (row != null) {
-            bldr.rowBuffer(row.byteBuffer());
+            BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
+                    .binaryTuple(row.tupleSlice())
+                    .schemaVersion(row.schemaVersion())
+                    .build();
+
+            bldr.rowMessage(rowMessage);
         }
 
         return bldr.build();
@@ -2385,7 +2395,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param txId Transaction ID.
      * @return Constructed {@link UpdateAllCommand} object.
      */
-    private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, ByteBuffer> rowsToUpdate, UUID txId) {
+    private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, BinaryRowMessage> rowsToUpdate, UUID txId) {
         return MSG_FACTORY.updateAllCommand()
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowsToUpdate(rowsToUpdate)
@@ -2418,8 +2428,8 @@ public class PartitionReplicaListener implements ReplicaListener {
     }
 
     /**
-     * Class that stores a list of futures for operations that has happened in a specific transaction.
-     * Also, the class has a property {@code state} that represents a transaction state.
+     * Class that stores a list of futures for operations that has happened in a specific transaction. Also, the class has a property
+     * {@code state} that represents a transaction state.
      */
     private static class TxCleanupReadyFutureList {
         /**
@@ -2428,8 +2438,8 @@ public class PartitionReplicaListener implements ReplicaListener {
         final Map<RequestType, List<CompletableFuture<?>>> futures = new EnumMap<>(RequestType.class);
 
         /**
-         * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states.
-         * If the property is {@code null} the transaction is in pending state.
+         * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states. If the property is
+         * {@code null} the transaction is in pending state.
          */
         TxState state;
     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 94aa2eadd5..ce3b671430 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -32,7 +32,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.net.ConnectException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -74,7 +73,9 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
@@ -237,9 +238,9 @@ public class InternalTableImpl implements InternalTable {
             );
         }
 
-        final boolean implicit = tx == null;
+        boolean implicit = tx == null;
 
-        final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
+        InternalTransaction tx0 = implicit ? txManager.begin() : tx;
 
         int partId = partitionId(row);
 
@@ -300,7 +301,7 @@ public class InternalTableImpl implements InternalTable {
             );
         }
 
-        final boolean implicit = tx == null;
+        boolean implicit = tx == null;
 
         // It's possible to have null txState if transaction isn't started yet.
         if (!implicit && !(tx.state() == TxState.PENDING || tx.state() == null)) {
@@ -308,7 +309,7 @@ public class InternalTableImpl implements InternalTable {
                     "The operation is attempted for completed transaction"));
         }
 
-        final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
+        InternalTransaction tx0 = implicit ? txManager.begin() : tx;
 
         Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows);
 
@@ -532,7 +533,7 @@ public class InternalTableImpl implements InternalTable {
                     tx,
                     (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                             .groupId(groupId)
-                            .binaryRowBytes(keyRow.byteBuffer())
+                            .binaryRowMessage(serializeBinaryRow(keyRow))
                             .commitPartitionId(commitPart)
                             .transactionId(txo.id())
                             .term(term)
@@ -554,7 +555,7 @@ public class InternalTableImpl implements InternalTable {
 
         return replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlySingleRowReplicaRequest()
                 .groupId(partGroupId)
-                .binaryRowBytes(keyRow.byteBuffer())
+                .binaryRowMessage(serializeBinaryRow(keyRow))
                 .requestType(RequestType.RO_GET)
                 .readTimestampLong(readTimestamp.longValue())
                 .build()
@@ -579,7 +580,7 @@ public class InternalTableImpl implements InternalTable {
                     tx,
                     (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
                             .groupId(groupId)
-                            .binaryRowsBytes(serializeBinaryRows(keyRows0))
+                            .binaryRowMessages(serializeBinaryRows(keyRows0))
                             .commitPartitionId(commitPart)
                             .transactionId(txo.id())
                             .term(term)
@@ -603,30 +604,36 @@ public class InternalTableImpl implements InternalTable {
         for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
             ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partitionRowBatch.getIntKey()).groupId();
 
-            CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlyMultiRowReplicaRequest()
+            ReadOnlyMultiRowReplicaRequest request = tableMessagesFactory.readOnlyMultiRowReplicaRequest()
                     .groupId(partGroupId)
-                    .binaryRowsBytes(serializeBinaryRows(partitionRowBatch.getValue().requestedRows))
+                    .binaryRowMessages(serializeBinaryRows(partitionRowBatch.getValue().requestedRows))
                     .requestType(RequestType.RO_GET_ALL)
                     .readTimestampLong(readTimestamp.longValue())
-                    .build()
-            );
+                    .build();
 
-            partitionRowBatch.getValue().resultFuture = fut;
+            partitionRowBatch.getValue().resultFuture = replicaSvc.invoke(recipientNode, request);
         }
 
         return collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
     }
 
-    private static List<ByteBuffer> serializeBinaryRows(Collection<? extends BinaryRow> rows) {
-        var result = new ArrayList<ByteBuffer>(rows.size());
+    private List<BinaryRowMessage> serializeBinaryRows(Collection<? extends BinaryRow> rows) {
+        var result = new ArrayList<BinaryRowMessage>(rows.size());
 
         for (BinaryRow row : rows) {
-            result.add(row.byteBuffer());
+            result.add(serializeBinaryRow(row));
         }
 
         return result;
     }
 
+    private BinaryRowMessage serializeBinaryRow(BinaryRow row) {
+        return tableMessagesFactory.binaryRowMessage()
+                .binaryTuple(row.tupleSlice())
+                .schemaVersion(row.schemaVersion())
+                .build();
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction tx) {
@@ -636,7 +643,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(row.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_UPSERT)
@@ -680,7 +687,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(row.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_UPSERT)
@@ -698,7 +705,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(row.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_INSERT)
@@ -716,7 +723,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
+                        .binaryRowMessages(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_INSERT_ALL)
@@ -735,7 +742,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(row.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE_IF_EXIST)
@@ -753,8 +760,8 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSwapRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .oldBinaryRowBytes(oldRow.byteBuffer())
-                        .binaryRowBytes(newRow.byteBuffer())
+                        .oldBinaryRowMessage(serializeBinaryRow(oldRow))
+                        .binaryRowMessage(serializeBinaryRow(newRow))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_REPLACE)
@@ -772,7 +779,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(row.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_REPLACE)
@@ -790,7 +797,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(keyRow.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(keyRow))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE)
@@ -808,7 +815,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(oldRow.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(oldRow))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_EXACT)
@@ -826,7 +833,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowBytes(row.byteBuffer())
+                        .binaryRowMessage(serializeBinaryRow(row))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_GET_AND_DELETE)
@@ -844,7 +851,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
+                        .binaryRowMessages(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_ALL)
@@ -866,7 +873,7 @@ public class InternalTableImpl implements InternalTable {
                 (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
                         .groupId(groupId)
                         .commitPartitionId(commitPart)
-                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
+                        .binaryRowMessages(serializeBinaryRows(keyRows0))
                         .transactionId(txo.id())
                         .term(term)
                         .requestType(RequestType.RW_DELETE_EXACT_ALL)
@@ -982,18 +989,16 @@ public class InternalTableImpl implements InternalTable {
         // won't be rolled back automatically - it's up to the user or outer engine.
         if (tx != null && tx.isReadOnly()) {
             throw new TransactionException(
-                    new TransactionException(
-                            TX_FAILED_READ_WRITE_OPERATION_ERR,
-                            "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + '}'
-                    )
+                    TX_FAILED_READ_WRITE_OPERATION_ERR,
+                    "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + '}'
             );
         }
 
         validatePartitionIndex(partId);
 
-        final boolean implicit = tx == null;
+        boolean implicit = tx == null;
 
-        final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
+        InternalTransaction tx0 = implicit ? txManager.begin() : tx;
 
         return new PartitionScanPublisher(
                 (scanId, batchSize) -> enlistCursorInTx(
@@ -1346,7 +1351,7 @@ public class InternalTableImpl implements InternalTable {
         Function<CompletableFuture<Void>, CompletableFuture<Void>> onClose;
 
         /** True when the publisher has a subscriber, false otherwise. */
-        private AtomicBoolean subscribed;
+        private final AtomicBoolean subscribed;
 
         /**
          * The constructor.
@@ -1623,7 +1628,7 @@ public class InternalTableImpl implements InternalTable {
         return tableMessagesFactory.readWriteMultiRowReplicaRequest()
                 .groupId(groupId)
                 .commitPartitionId(commitPart)
-                .binaryRowsBytes(serializeBinaryRows(keyRows0))
+                .binaryRowMessages(serializeBinaryRows(keyRows0))
                 .transactionId(txo.id())
                 .term(term)
                 .requestType(RequestType.RW_UPSERT_ALL)
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index e5eb1e768b..7307f934e4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -55,13 +56,15 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
- * Base test for indexes. Sets up a table with (int, string) key and (int, string) value and
- * three indexes: primary key, hash index over value columns and sorted index over value columns.
+ * Base test for indexes. Sets up a table with (int, string) key and (int, string) value and three indexes: primary key, hash index over
+ * value columns and sorted index over value columns.
  */
 @ExtendWith(ConfigurationExtension.class)
 public abstract class IndexBaseTest extends BaseMvStoragesTest {
     protected static final int PARTITION_ID = 0;
 
+    private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
+
     private static final BinaryTupleSchema TUPLE_SCHEMA = BinaryTupleSchema.createRowSchema(schemaDescriptor);
 
     private static final BinaryTupleSchema PK_INDEX_SCHEMA = BinaryTupleSchema.createKeySchema(schemaDescriptor);
@@ -160,13 +163,7 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest {
     static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) {
         TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
 
-        handler.handleUpdate(
-                TX_ID,
-                rowUuid,
-                partitionId,
-                row == null ? null : row.byteBuffer(),
-                (unused) -> {}
-        );
+        handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {});
     }
 
     static BinaryRow defaultRow() {
@@ -226,22 +223,23 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest {
         USE_UPDATE {
             @Override
             void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) {
-                handler.handleUpdate(
-                        TX_ID,
-                        rowUuid,
-                        partitionId,
-                        row == null ? null : row.byteBuffer(),
-                        (unused) -> {}
-                );
+                handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {});
             }
         },
         /** Uses updateAll api. */
         USE_UPDATE_ALL {
             @Override
             void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+                BinaryRowMessage rowMessage = row == null
+                        ? null
+                        : MSG_FACTORY.binaryRowMessage()
+                                .binaryTuple(row.tupleSlice())
+                                .schemaVersion(row.schemaVersion())
+                                .build();
+
                 handler.handleUpdateAll(
                         TX_ID,
-                        singletonMap(rowUuid, row == null ? null : row.byteBuffer()),
+                        singletonMap(rowUuid, rowMessage),
                         partitionId,
                         (unused) -> {}
                 );
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 60d728e269..c4a661e310 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Locale;
@@ -32,14 +31,15 @@ import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.Command;
-import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
@@ -52,13 +52,10 @@ import org.junit.jupiter.api.Test;
  */
 public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
     /** Key-value marshaller for tests. */
-    protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+    private static KvMarshaller<TestKey, TestValue> kvMarshaller;
 
     /** Message factory to create messages - RAFT commands. */
-    private TableMessagesFactory msgFactory = new TableMessagesFactory();
-
-    /** Factory for replica messages. */
-    private ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory();
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
 
     @BeforeAll
     static void beforeAll() {
@@ -84,15 +81,13 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
                         .build()
                 )
                 .rowUuid(UUID.randomUUID())
-                .rowBuffer(byteBufferFromBinaryRow(1))
+                .rowMessage(binaryRowMessage(1))
                 .txId(TestTransactionIds.newTransactionId())
                 .build();
 
         UpdateCommand readCmd = copyCommand(cmd);
 
-        assertEquals(cmd.txId(), readCmd.txId());
-        assertEquals(cmd.rowUuid(), readCmd.rowUuid());
-        assertEquals(cmd.rowBuffer(), readCmd.rowBuffer());
+        assertEquals(cmd, readCmd);
     }
 
     @Test
@@ -111,15 +106,15 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
 
         assertEquals(cmd.txId(), readCmd.txId());
         assertEquals(cmd.rowUuid(), readCmd.rowUuid());
-        assertNull(readCmd.rowBuffer());
+        assertNull(readCmd.rowMessage());
     }
 
     @Test
     public void testUpdateAllCommand() throws Exception {
-        Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>();
+        Map<UUID, BinaryRowMessage> rowsToUpdate = new HashMap<>();
 
         for (int i = 0; i < 10; i++) {
-            rowsToUpdate.put(TestTransactionIds.newTransactionId(), byteBufferFromBinaryRow(i));
+            rowsToUpdate.put(TestTransactionIds.newTransactionId(), binaryRowMessage(i));
         }
 
         var cmd = msgFactory.updateAllCommand()
@@ -136,7 +131,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
 
         assertEquals(cmd.txId(), readCmd.txId());
 
-        for (Map.Entry<UUID, ByteBuffer> entry : cmd.rowsToUpdate().entrySet()) {
+        for (Map.Entry<UUID, BinaryRowMessage> entry : cmd.rowsToUpdate().entrySet()) {
             assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey()));
 
             var readVal = readCmd.rowsToUpdate().get(entry.getKey());
@@ -148,7 +143,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
 
     @Test
     public void testRemoveAllCommand() throws Exception {
-        Map<UUID, ByteBuffer> rowsToRemove = new HashMap<>();
+        Map<UUID, BinaryRowMessage> rowsToRemove = new HashMap<>();
 
         for (int i = 0; i < 10; i++) {
             rowsToRemove.put(TestTransactionIds.newTransactionId(), null);
@@ -245,7 +240,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
                     .txId(updateCommand.txId())
                     .rowUuid(updateCommand.rowUuid())
                     .tablePartitionId(updateCommand.tablePartitionId())
-                    .rowBuffer(updateCommand.rowBuffer())
+                    .rowMessage(updateCommand.rowMessage())
                     .build();
         } else if (cmd instanceof UpdateAllCommand) {
             UpdateAllCommand updateCommand = (UpdateAllCommand) cmd;
@@ -262,8 +257,16 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
         }
     }
 
-    private static ByteBuffer byteBufferFromBinaryRow(int id) throws Exception {
-        return kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id))).byteBuffer();
+    private BinaryRowMessage binaryRowMessage(int id) throws Exception {
+        Row row = kvMarshaller.marshal(
+                new TestKey(id, String.valueOf(id)),
+                new TestValue(id, String.valueOf(id))
+        );
+
+        return msgFactory.binaryRowMessage()
+                .binaryTuple(row.tupleSlice())
+                .schemaVersion(row.schemaVersion())
+                .build();
     }
 
     /**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index f1d103dc99..d7c4f76f78 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -38,7 +38,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -91,6 +90,7 @@ import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -621,14 +621,12 @@ public class PartitionCommandListenerTest {
      * Inserts all rows.
      */
     private void insertAll() {
-        Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT);
+        Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
         UUID txId = TestTransactionIds.newTransactionId();
         var commitPartId = new TablePartitionId(1, PARTITION_ID);
 
         for (int i = 0; i < KEY_COUNT; i++) {
-            Row row = getTestRow(i, i);
-
-            rows.put(TestTransactionIds.newTransactionId(), row.byteBuffer());
+            rows.put(TestTransactionIds.newTransactionId(), getTestRow(i, i));
         }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
@@ -660,12 +658,12 @@ public class PartitionCommandListenerTest {
     private void updateAll(Function<Integer, Integer> keyValueMapper) {
         UUID txId = TestTransactionIds.newTransactionId();
         var commitPartId = new TablePartitionId(1, PARTITION_ID);
-        Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT);
+        Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
-            Row row = getTestRow(i, keyValueMapper.apply(i));
+            BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i));
 
-            rows.put(readRow(row).uuid(), row.byteBuffer());
+            rows.put(readRow(row.asBinaryRow()).uuid(), row);
         }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
@@ -695,12 +693,12 @@ public class PartitionCommandListenerTest {
     private void deleteAll() {
         UUID txId = TestTransactionIds.newTransactionId();
         var commitPartId = new TablePartitionId(1, PARTITION_ID);
-        Map<UUID, ByteBuffer> keyRows = new HashMap<>(KEY_COUNT);
+        Map<UUID, BinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
-            Row row = getTestRow(i, i);
+            BinaryRowMessage row = getTestRow(i, i);
 
-            keyRows.put(readRow(row).uuid(), null);
+            keyRows.put(readRow(row.asBinaryRow()).uuid(), null);
         }
 
         HybridTimestamp commitTimestamp = hybridClock.now();
@@ -734,8 +732,8 @@ public class PartitionCommandListenerTest {
 
         commandListener.onWrite(iterator((i, clo) -> {
             UUID txId = TestTransactionIds.newTransactionId();
-            Row row = getTestRow(i, keyValueMapper.apply(i));
-            RowId rowId = readRow(row);
+            BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i));
+            RowId rowId = readRow(row.asBinaryRow());
 
             assertNotNull(rowId);
 
@@ -749,7 +747,7 @@ public class PartitionCommandListenerTest {
                                     .tableId(1)
                                     .partitionId(PARTITION_ID).build())
                             .rowUuid(rowId.uuid())
-                            .rowBuffer(row.byteBuffer())
+                            .rowMessage(row)
                             .txId(txId)
                             .safeTimeLong(hybridClock.nowLong())
                             .build());
@@ -779,7 +777,7 @@ public class PartitionCommandListenerTest {
 
         commandListener.onWrite(iterator((i, clo) -> {
             UUID txId = TestTransactionIds.newTransactionId();
-            Row row = getTestRow(i, i);
+            BinaryRow row = getTestRow(i, i).asBinaryRow();
             RowId rowId = readRow(row);
 
             assertNotNull(rowId);
@@ -857,7 +855,6 @@ public class PartitionCommandListenerTest {
 
         commandListener.onWrite(iterator((i, clo) -> {
             UUID txId = TestTransactionIds.newTransactionId();
-            Row row = getTestRow(i, i);
             txIds.add(txId);
 
             when(clo.index()).thenReturn(raftIndex.incrementAndGet());
@@ -868,7 +865,7 @@ public class PartitionCommandListenerTest {
                                     .tableId(1)
                                     .partitionId(PARTITION_ID).build())
                             .rowUuid(UUID.randomUUID())
-                            .rowBuffer(row.byteBuffer())
+                            .rowMessage(getTestRow(i, i))
                             .txId(txId)
                             .safeTimeLong(hybridClock.nowLong())
                             .build());
@@ -909,13 +906,18 @@ public class PartitionCommandListenerTest {
      *
      * @return Row.
      */
-    private Row getTestRow(int key, int val) {
+    private BinaryRowMessage getTestRow(int key, int val) {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA);
 
         rowBuilder.appendInt(key);
         rowBuilder.appendInt(val);
 
-        return new Row(SCHEMA, rowBuilder.build());
+        BinaryRow row = rowBuilder.build();
+
+        return msgFactory.binaryRowMessage()
+                .binaryTuple(row.tupleSlice())
+                .schemaVersion(row.schemaVersion())
+                .build();
     }
 
     private void invokeBatchedCommand(WriteCommand cmd) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 43a7b826ca..fbae8d3e71 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -44,7 +44,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -85,6 +84,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -352,7 +352,7 @@ public class IncomingSnapshotCopierTest {
 
             Collections.reverse(readResults);
 
-            List<ByteBuffer> rowVersions = new ArrayList<>();
+            List<BinaryRowMessage> rowVersions = new ArrayList<>();
             long[] timestamps = new long[readResults.size() + (readResults.get(0).isWriteIntent() ? -1 : 0)];
 
             UUID txId = null;
@@ -361,7 +361,12 @@ public class IncomingSnapshotCopierTest {
 
             int j = 0;
             for (ReadResult readResult : readResults) {
-                rowVersions.add(readResult.binaryRow().byteBuffer());
+                BinaryRowMessage rowMessage = TABLE_MSG_FACTORY.binaryRowMessage()
+                        .binaryTuple(readResult.binaryRow().tupleSlice())
+                        .schemaVersion(readResult.binaryRow().schemaVersion())
+                        .build();
+
+                rowVersions.add(rowMessage);
 
                 if (readResult.isWriteIntent()) {
                     txId = readResult.transactionId();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 05f5199784..56afb037c2 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -30,12 +30,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.when;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -52,6 +54,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
 class OutgoingSnapshotMvDataStreamingTest {
+    private static final BinaryRow ROW_1 = new BinaryRowImpl(0, ByteBuffer.wrap(new byte[]{1}));
+    private static final BinaryRow ROW_2 = new BinaryRowImpl(0, ByteBuffer.wrap(new byte[]{2}));
+
     @Mock
     private PartitionAccess partitionAccess;
 
@@ -93,10 +98,10 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsCommittedAndUncommittedVersionsFromStorage() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
         ReadResult version2 = ReadResult.createFromWriteIntent(
                 rowId1,
-                new ByteBufferRow(new byte[]{2}),
+                ROW_2,
                 transactionId,
                 commitTableId,
                 42,
@@ -118,8 +123,8 @@ class OutgoingSnapshotMvDataStreamingTest {
         assertThat(responseRow.timestamps(), is(equalTo(new long[] {version1.commitTimestamp().longValue()})));
 
         assertThat(responseRow.rowVersions(), hasSize(2));
-        assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{1}));
-        assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{2}));
+        assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_1));
+        assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_2));
     }
 
     private void configurePartitionAccessToHaveExactlyOneRowWith(List<ReadResult> versions) {
@@ -148,8 +153,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void reversesOrderOfVersionsObtainedFromPartition() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
 
         configurePartitionAccessToHaveExactlyOneRowWith(List.of(version1, version2));
 
@@ -164,14 +169,14 @@ class OutgoingSnapshotMvDataStreamingTest {
         );
 
         assertThat(responseRow.rowVersions(), hasSize(2));
-        assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{2}));
-        assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{1}));
+        assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_2));
+        assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_1));
     }
 
     @Test
     void iteratesRowsInPartition() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId2, ROW_2, clock.now());
 
         when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1));
@@ -184,9 +189,9 @@ class OutgoingSnapshotMvDataStreamingTest {
         assertThat(response.rows(), hasSize(2));
 
         assertThat(response.rows().get(0).rowVersions(), hasSize(1));
-        assertThat(response.rows().get(0).rowVersions().get(0).array(), is(new byte[]{1}));
+        assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(ROW_1));
         assertThat(response.rows().get(1).rowVersions(), hasSize(1));
-        assertThat(response.rows().get(1).rowVersions().get(0).array(), is(new byte[]{2}));
+        assertThat(response.rows().get(1).rowVersions().get(0).asBinaryRow(), is(ROW_2));
     }
 
     @Test
@@ -209,10 +214,10 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsCommittedAndUncommittedVersionsFromQueue() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now());
         ReadResult version2 = ReadResult.createFromWriteIntent(
                 rowIdOutOfOrder,
-                new ByteBufferRow(new byte[]{2}),
+                ROW_2,
                 transactionId,
                 commitTableId,
                 42,
@@ -244,8 +249,8 @@ class OutgoingSnapshotMvDataStreamingTest {
         assertThat(responseRow.timestamps(), is(equalTo(new long[] {version1.commitTimestamp().longValue()})));
 
         assertThat(responseRow.rowVersions(), hasSize(2));
-        assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{1}));
-        assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{2}));
+        assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_1));
+        assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_2));
     }
 
     private void configureClosestRowIdToBeEmpty() {
@@ -254,8 +259,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsOutOfOrderRowsWithHighestPriority() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
 
         when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1));
 
@@ -309,7 +314,7 @@ class OutgoingSnapshotMvDataStreamingTest {
     void doesNotSendWriteIntentTimestamp() {
         ReadResult version = ReadResult.createFromWriteIntent(
                 rowId1,
-                new ByteBufferRow(new byte[]{1}),
+                ROW_1,
                 transactionId,
                 commitTableId,
                 42,
@@ -334,8 +339,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void mvDataHandlingRespectsBatchSizeHintForMessagesFromPartition() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId2, ROW_2, clock.now());
 
         when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1));
@@ -349,8 +354,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void mvDataHandlingRespectsBatchSizeHintForOutOfOrderMessages() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
 
         when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1));
 
@@ -373,8 +378,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void mvDataResponseThatIsNotLastHasFinishFalse() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
 
         when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2));
@@ -387,7 +392,9 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsRowsFromPartitionBiggerThanHint() {
-        ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[1000]), clock.now());
+        var row = new BinaryRowImpl(0, ByteBuffer.allocate(1000));
+
+        ReadResult version = ReadResult.createFromCommitted(rowId1, row, clock.now());
 
         configurePartitionAccessToHaveExactlyOneRowWith(List.of(version));
 
@@ -395,12 +402,14 @@ class OutgoingSnapshotMvDataStreamingTest {
 
         assertThat(response.rows(), hasSize(1));
         assertThat(response.rows().get(0).rowVersions(), hasSize(1));
-        assertThat(response.rows().get(0).rowVersions().get(0).limit(), is(1000));
+        assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(row));
     }
 
     @Test
     void sendsRowsFromOutOfOrderQueueBiggerThanHint() {
-        ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[1000]), clock.now());
+        var row = new BinaryRowImpl(0, ByteBuffer.allocate(1000));
+
+        ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, row, clock.now());
 
         when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version));
 
@@ -418,7 +427,7 @@ class OutgoingSnapshotMvDataStreamingTest {
 
         assertThat(response.rows(), hasSize(1));
         assertThat(response.rows().get(0).rowVersions(), hasSize(1));
-        assertThat(response.rows().get(0).rowVersions().get(0).limit(), is(1000));
+        assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(row));
     }
 
     @Test
@@ -434,8 +443,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void lastSentRowIdIsPassed() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
 
         when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2));
@@ -454,8 +463,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void notYetSentRowIdIsNotPassed() {
-        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
 
         when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2));
@@ -474,7 +483,7 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void anyRowIdIsPassedForFinishedSnapshot() {
-        ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
 
         configurePartitionAccessToHaveExactlyOneRowWith(List.of(version));
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index eb73abcd65..c1def6368f 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -253,7 +254,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
                 .term(1L)
                 .commitPartitionId(PARTITION_ID)
                 .transactionId(TRANSACTION_ID)
-                .binaryRowBytes(testBinaryRow.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(testBinaryRow))
                 .requestType(arg.type)
                 .build());
 
@@ -299,7 +300,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
                 .term(1L)
                 .commitPartitionId(PARTITION_ID)
                 .transactionId(TRANSACTION_ID)
-                .binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
+                .binaryRowMessages(rows.stream().map(PartitionReplicaListenerIndexLockingTest::binaryRowMessage).collect(toList()))
                 .requestType(arg.type)
                 .build());
 
@@ -397,6 +398,13 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
         };
     }
 
+    private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) {
+        return TABLE_MESSAGES_FACTORY.binaryRowMessage()
+                .binaryTuple(binaryRow.tupleSlice())
+                .schemaVersion(binaryRow.schemaVersion())
+                .build();
+    }
+
     static class ReadWriteTestArg {
         private final RequestType type;
         private final LockMode expectedLockOnUniqueHash;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index e9fbfe4e29..0b6fdd5e17 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -115,6 +116,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
@@ -513,7 +515,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(clock.nowLong())
-                .binaryRowBytes(testBinaryKey.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(testBinaryKey))
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -536,7 +538,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(clock.nowLong())
-                .binaryRowBytes(testBinaryKey.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(testBinaryKey))
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -559,7 +561,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(clock.nowLong())
-                .binaryRowBytes(testBinaryKey.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(testBinaryKey))
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -581,7 +583,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(clock.nowLong())
-                .binaryRowBytes(testBinaryKey.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(testBinaryKey))
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -604,7 +606,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(clock.nowLong())
-                .binaryRowBytes(testBinaryKey.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(testBinaryKey))
                 .requestType(RequestType.RO_GET)
                 .build());
 
@@ -985,7 +987,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(txId)
                 .requestType(requestType)
-                .binaryRowBytes(binaryRow.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(binaryRow))
                 .term(1L)
                 .commitPartitionId(commitPartitionId())
                 .build()
@@ -1001,7 +1003,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(txId)
                 .requestType(requestType)
-                .binaryRowsBytes(binaryRows.stream().map(BinaryRow::byteBuffer).collect(toList()))
+                .binaryRowMessages(binaryRows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
                 .term(1L)
                 .commitPartitionId(commitPartitionId())
                 .build()
@@ -1022,7 +1024,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                             .groupId(grpId)
                             .transactionId(txId)
                             .requestType(RequestType.RW_INSERT)
-                            .binaryRowBytes(binaryRow.byteBuffer())
+                            .binaryRowMessage(binaryRowMessage(binaryRow))
                             .term(1L)
                             .commitPartitionId(commitPartitionId())
                             .build();
@@ -1049,7 +1051,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                             .groupId(grpId)
                             .transactionId(txId)
                             .requestType(RequestType.RW_UPSERT_ALL)
-                            .binaryRowsBytes(asList(binaryRow0.byteBuffer(), binaryRow1.byteBuffer()))
+                            .binaryRowMessages(asList(binaryRowMessage(binaryRow0), binaryRowMessage(binaryRow1)))
                             .term(1L)
                             .commitPartitionId(commitPartitionId())
                             .build();
@@ -1072,7 +1074,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
                 BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
 
-                if (rowEquals(binaryRow, row)) {
+                if (binaryRow.equals(row)) {
                     found = true;
                 }
             }
@@ -1083,7 +1085,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
             BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
 
-            assertTrue(row == null || !rowEquals(row, binaryRow));
+            assertTrue(row == null || !row.equals(binaryRow));
         }
     }
 
@@ -1226,11 +1228,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                     ? (upsertAfterDelete ? br1 : null)
                     : (insertFirst ? br1 : null);
 
-            if (expected == null) {
-                assertNull(res);
-            } else {
-                assertTrue(rowEquals(expected, res));
-            }
+            assertEquals(expected, res);
         }
 
         cleanup(tx1);
@@ -1465,8 +1463,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(targetTxId)
                         .requestType(RequestType.RW_REPLACE)
-                        .oldBinaryRowBytes(binaryRow(key, new TestValue(1, "v1"), kvMarshaller).byteBuffer())
-                        .binaryRowBytes(binaryRow(key, new TestValue(3, "v3"), kvMarshaller).byteBuffer())
+                        .oldBinaryRowMessage(binaryRowMessage(binaryRow(key, new TestValue(1, "v1"), kvMarshaller)))
+                        .binaryRowMessage(binaryRowMessage(binaryRow(key, new TestValue(3, "v3"), kvMarshaller)))
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
                         .build()
@@ -1528,7 +1526,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .requestType(RequestType.RW_UPSERT)
                 .transactionId(txId)
-                .binaryRowBytes(row.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(row))
                 .term(1L)
                 .commitPartitionId(new TablePartitionId(tblId, partId))
                 .build()
@@ -1540,7 +1538,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .requestType(RequestType.RW_DELETE)
                 .transactionId(txId)
-                .binaryRowBytes(row.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(row))
                 .term(1L)
                 .commitPartitionId(new TablePartitionId(tblId, partId))
                 .build()
@@ -1552,7 +1550,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .requestType(RequestType.RO_GET)
                 .readTimestampLong(readTimestamp)
-                .binaryRowBytes(row.byteBuffer())
+                .binaryRowMessage(binaryRowMessage(row))
                 .build()
         );
 
@@ -1564,7 +1562,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .requestType(RequestType.RO_GET_ALL)
                 .readTimestampLong(readTimestamp)
-                .binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
+                .binaryRowMessages(rows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
                 .build()
         );
 
@@ -1619,11 +1617,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     protected BinaryRow binaryRow(int i) {
-        try {
-            return kvMarshaller.marshal(new TestKey(i, "k" + i), new TestValue(i, "v" + i));
-        } catch (MarshallerException e) {
-            throw new AssertionError(e);
-        }
+        return binaryRow(new TestKey(i, "k" + i), new TestValue(i, "v" + i));
     }
 
     private BinaryRow binaryRow(TestKey key, TestValue value) {
@@ -1632,7 +1626,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private static BinaryRow binaryRow(TestKey key, TestValue value, KvMarshaller<TestKey, TestValue> marshaller) {
         try {
-            return marshaller.marshal(key, value);
+            Row row = marshaller.marshal(key, value);
+
+            return new BinaryRowImpl(row.schemaVersion(), row.tupleSlice());
         } catch (MarshallerException e) {
             throw new AssertionError(e);
         }
@@ -1654,10 +1650,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         }
     }
 
-    private static boolean rowEquals(BinaryRow row1, BinaryRow row2) {
-        return row1.schemaVersion() == row2.schemaVersion()
-                && row1.hasValue() == row2.hasValue()
-                && row1.tupleSlice().equals(row2.tupleSlice());
+    private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) {
+        return TABLE_MESSAGES_FACTORY.binaryRowMessage()
+                .binaryTuple(binaryRow.tupleSlice())
+                .schemaVersion(binaryRow.schemaVersion())
+                .build();
     }
 
     /**