You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/06/02 07:09:24 UTC

[ignite-3] branch main updated: IGNITE-14239 Raft based implementation of atomic protocol. Fixes #118

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e912b5  IGNITE-14239 Raft based implementation of atomic protocol. Fixes #118
4e912b5 is described below

commit 4e912b58dcde8a46ff4da4769a26c7c301ab691e
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Jun 2 10:00:25 2021 +0300

    IGNITE-14239 Raft based implementation of atomic protocol. Fixes #118
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../java/org/apache/ignite/table/KeyValueView.java | 151 +++--
 .../java/org/apache/ignite/table/TableView.java    | 154 +++--
 .../ignite/distributed/ITDistributedTableTest.java | 242 ++++++-
 .../ignite/internal/table/KVBinaryViewImpl.java    | 137 ++--
 .../apache/ignite/internal/table/KVViewImpl.java   |  62 +-
 .../ignite/internal/table/RecordViewImpl.java      |  72 +--
 .../apache/ignite/internal/table/TableImpl.java    | 168 +++--
 .../table/distributed/command/CommandUtils.java    | 158 +++++
 .../distributed/command/DeleteAllCommand.java      |  68 ++
 .../table/distributed/command/DeleteCommand.java   |  43 +-
 .../distributed/command/DeleteExactAllCommand.java |  68 ++
 ...{InsertCommand.java => DeleteExactCommand.java} |  50 +-
 .../table/distributed/command/GetAllCommand.java   |  68 ++
 ...DeleteCommand.java => GetAndDeleteCommand.java} |  50 +-
 ...nsertCommand.java => GetAndReplaceCommand.java} |  49 +-
 ...DeleteCommand.java => GetAndUpsertCommand.java} |  52 +-
 .../table/distributed/command/GetCommand.java      |  43 +-
 .../distributed/command/InsertAllCommand.java      |  68 ++
 .../table/distributed/command/InsertCommand.java   |  43 +-
 .../table/distributed/command/ReplaceCommand.java  |  55 +-
 ...sertCommand.java => ReplaceIfExistCommand.java} |  49 +-
 .../distributed/command/UpsertAllCommand.java      |  68 ++
 .../table/distributed/command/UpsertCommand.java   |  43 +-
 .../command/response/MultiRowsResponse.java        |  71 +++
 .../{KVGetResponse.java => SingleRowResponse.java} |  57 +-
 .../table/distributed/raft/PartitionListener.java  | 243 ++++++-
 .../distributed/storage/InternalTableImpl.java     | 148 ++++-
 .../raft/PartitionCommandListenerTest.java         | 700 +++++++++++++++++++++
 28 files changed, 2426 insertions(+), 754 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
index f58fb65..ef0a80d 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
@@ -36,176 +36,197 @@ public interface KeyValueView<K, V> {
     /**
      * Gets a value associated with the given key.
      *
-     * @param key The key whose associated value is to be returned.
+     * @param key A key which associated the value is to be returned.
+     * The key cannot be {@code null}.
      * @return Value or {@code null}, if it does not exist.
      */
-    V get(K key);
+    V get(@NotNull K key);
 
     /**
      * Asynchronously gets a value associated with the given key.
      *
-     * @param key The key whose associated value is to be returned.
+     * @param key A key which associated the value is to be returned.
+     * The key cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<V> getAsync(K key);
+    @NotNull CompletableFuture<V> getAsync(@NotNull K key);
 
     /**
      * Get values associated with given keys.
      *
-     * @param keys Keys whose associated values are to be returned.
+     * @param keys Keys which associated values are to be returned.
+     * The keys cannot be {@code null}.
      * @return Values associated with given keys.
      */
-    Map<K, V> getAll(Collection<K> keys);
+    Map<K, V> getAll(@NotNull Collection<K> keys);
 
     /**
      * Get values associated with given keys.
      *
      * @param keys Keys whose associated values are to be returned.
+     * The keys cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys);
+    @NotNull CompletableFuture<Map<K, V>> getAllAsync(@NotNull Collection<K> keys);
 
     /**
      * Determines if the table contains an entry for the specified key.
      *
-     * @param key The key whose presence is to be tested.
+     * @param key A key which presence is to be tested.
+     * The keys cannot be {@code null}.
      * @return {@code True} if a value exists for the specified key, {@code false} otherwise.
      */
-    boolean contains(K key);
+    boolean contains(@NotNull K key);
 
     /**
      * Puts value associated with given key into the table.
      *
-     * @param key Key with which the specified value is to be associated.
+     * @param key A key with which the specified value is to be associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      */
-    void put(K key, V val);
+    void put(@NotNull K key, V val);
 
     /**
      * Asynchronously puts value associated with given key into the table.
      *
-     * @param key Key with which the specified value is to be associated.
+     * @param key A key with which the specified value is to be associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Void> putAsync(K key, V val);
+    @NotNull CompletableFuture<Void> putAsync(@NotNull K key, V val);
 
     /**
      * Put associated key-value pairs.
      *
      * @param pairs Key-value pairs.
+     * The pairs cannot be {@code null}.
      */
-    void putAll(Map<K, V> pairs);
+    void putAll(@NotNull Map<K, V> pairs);
 
     /**
      * Asynchronously put associated key-value pairs.
      *
      * @param pairs Key-value pairs.
+     * The pairs cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Void> putAllAsync(Map<K, V> pairs);
+    @NotNull CompletableFuture<Void> putAllAsync(@NotNull Map<K, V> pairs);
 
     /**
      * Puts new or replaces existed value associated with given key into the table.
      *
-     * @param key Key with which the specified value is to be associated.
+     * @param key A key with which the specified value is to be associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return Replaced value or {@code null}, if not existed.
      */
-    V getAndPut(K key, V val);
+    V getAndPut(@NotNull K key, V val);
 
     /**
      * Asynchronously puts new or replaces existed value associated with given key into the table.
      *
-     * @param key Key with which the specified value is to be associated.
+     * @param key A key with which the specified value is to be associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<V> getAndPutAsync(K key, V val);
+    @NotNull CompletableFuture<V> getAndPutAsync(@NotNull K key, V val);
 
     /**
      * Puts value associated with given key into the table if not exists.
      *
-     * @param key Key with which the specified value is to be associated.
+     * @param key A key with which the specified value is to be associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return {@code True} if successful, {@code false} otherwise.
      */
-    boolean putIfAbsent(K key, @NotNull V val);
+    boolean putIfAbsent(@NotNull K key, @NotNull V val);
 
     /**
      * Asynchronously puts value associated with given key into the table if not exists.
      *
      * @param key Key with which the specified value is to be associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> putIfAbsentAsync(K key, V val);
+    @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull K key, V val);
 
     /**
      * Removes value associated with given key from the table.
      *
-     * @param key Key whose mapping is to be removed from the table.
+     * @param key A key which mapping is to be removed from the table.
+     * The key cannot be {@code null}.
      * @return {@code True} if a value associated with the specified key was successfully removed, {@code false} otherwise.
      */
-    boolean remove(K key);
+    boolean remove(@NotNull K key);
 
     /**
      * Asynchronously removes value associated with given key from the table.
      *
-     * @param key Key whose mapping is to be removed from the table.
+     * @param key A key which mapping is to be removed from the table.
+     * The key cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> removeAsync(K key);
+    @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key);
 
     /**
-     * Removes expected value associated with given key from the table.
+     * Removes an expected value associated with the given key from the table.
      *
-     * @param key Key whose associated value is to be removed from the table.
-     * @param val Expected value.
+     * @param key A key which associated value is to be removed from the table.
+     * The key cannot be {@code null}.
+     * @param val Expected value. The value cannot be {@code null}.
      * @return {@code True} if the expected value for the specified key was successfully removed, {@code false} otherwise.
      */
-    boolean remove(K key, @NotNull V val);
+    boolean remove(@NotNull K key, @NotNull V val);
 
     /**
      * Asynchronously removes expected value associated with given key from the table.
      *
-     * @param key Key whose associated value is to be removed from the table.
-     * @param val Expected value.
+     * @param key A key which associated the value is to be removed from the table.
+     * The key cannot be {@code null}.
+     * @param val Expected value. The value cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> removeAsync(K key, V val);
+    @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key, @NotNull V val);
 
     /**
      * Remove values associated with given keys from the table.
      *
-     * @param keys Keys whose mapping is to be removed from the table.
+     * @param keys Keys which mapping is to be removed from the table.
+     * The keys cannot be {@code null}.
      * @return Keys whose values were not existed.
      */
-    Collection<K> removeAll(Collection<K> keys);
+    Collection<K> removeAll(@NotNull Collection<K> keys);
 
     /**
      * Asynchronously remove values associated with given keys from the table.
      *
-     * @param keys Keys whose mapping is to be removed from the table.
+     * @param keys Keys which mapping is to be removed from the table.
+     * The keys cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<K> removeAllAsync(Collection<K> keys);
+    @NotNull CompletableFuture<Collection<K>> removeAllAsync(@NotNull Collection<K> keys);
 
     /**
      * Gets then removes value associated with given key from the table.
      *
-     * @param key Key whose associated value is to be removed from the table.
+     * @param key A key which associated value is to be removed from the table.
+     * The key cannot be {@code null}.
      * @return Removed value or {@code null}, if not existed.
      */
-    V getAndRemove(K key);
+    V getAndRemove(@NotNull K key);
 
     /**
      * Asynchronously gets then removes value associated with given key from the table.
      *
-     * @param key Key whose mapping is to be removed from the table.
+     * @param key A Key which mapping is to be removed from the table.
+     * The key cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<V> getAndRemoveAsync(K key);
+    @NotNull CompletableFuture<V> getAndRemoveAsync(@NotNull K key);
 
     /**
      * Replaces the value for a key only if exists. This is equivalent to
@@ -218,21 +239,23 @@ public interface KeyValueView<K, V> {
      * }</code></pre>
      * except that the action is performed atomically.
      *
-     * @param key Key with which the specified value is associated.
+     * @param key A key with which the specified value is associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return {@code True} if an old value was replaced, {@code false} otherwise.
      */
-    boolean replace(K key, V val);
+    boolean replace(@NotNull K key, V val);
 
     /**
      * Asynchronously replaces the value for a key only if exists.
      * See {@link #replace(Object, Object)}.
      *
-     * @param key Key with which the specified value is associated.
+     * @param key A key with which the specified value is associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> replaceAsync(K key, V val);
+    @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V val);
 
     /**
      * Replaces the expected value for a key. This is equivalent to
@@ -245,23 +268,25 @@ public interface KeyValueView<K, V> {
      * }</code></pre>
      * except that the action is performed atomically.
      *
-     * @param key Key with which the specified value is associated.
+     * @param key A key with which the specified value is associated.
+     * The key cannot be {@code null}.
      * @param oldVal Expected value associated with the specified key.
      * @param newVal Value to be associated with the specified key.
      * @return {@code True} if an old value was replaced, {@code false} otherwise.
      */
-    boolean replace(K key, V oldVal, V newVal);
+    boolean replace(@NotNull K key, V oldVal, V newVal);
 
     /**
      * Asynchronously replaces the expected value for a key.
      * See {@link #replace(Object, Object, Object)}
      *
-     * @param key Key with which the specified value is associated.
+     * @param key A key with which the specified value is associated.
+     * The key cannot be {@code null}.
      * @param oldVal Expected value associated with the specified key.
      * @param newVal Value to be associated with the specified key.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> replaceAsync(K key, V oldVal, V newVal);
+    @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V oldVal, V newVal);
 
     /**
      * Replaces the value for a given key only if exists. This is equivalent to
@@ -276,45 +301,51 @@ public interface KeyValueView<K, V> {
      * </code></pre>
      * except that the action is performed atomically.
      *
-     * @param key Key with which the specified value is associated.
+     * @param key A key with which the specified value is associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return Replaced value, or {@code null} if not existed.
      */
-    V getAndReplace(K key, V val);
+    V getAndReplace(@NotNull K key, V val);
 
     /**
      * Asynchronously replaces the value for a given key only if exists.
      * See {@link #getAndReplace(Object, Object)}
      *
-     * @param key Key with which the specified value is associated.
+     * @param key A key with which the specified value is associated.
+     * The key cannot be {@code null}.
      * @param val Value to be associated with the specified key.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<V> getAndReplaceAsync(K key, V val);
+    @NotNull CompletableFuture<V> getAndReplaceAsync(@NotNull K key, V val);
 
     /**
      * Executes invoke processor code against the value associated with the provided key.
      *
-     * @param key Key associated with the value that invoke processor will be applied to.
+     * @param key A key associated with the value that invoke processor will be applied to.
+     * The key cannot be {@code null}.
      * @param proc Invoke processor.
      * @param args Optional invoke processor arguments.
      * @param <R> Invoke processor result type.
      * @return Result of the processing.
      * @see InvokeProcessor
      */
-    <R extends Serializable> R invoke(K key, InvokeProcessor<K, V, R> proc, Serializable... args);
+    <R extends Serializable> R invoke(@NotNull K key, InvokeProcessor<K, V, R> proc, Serializable... args);
 
     /**
      * Asynchronously executes invoke processor code against the value associated with the provided key.
      *
-     * @param key Key associated with the value that invoke processor will be applied to.
+     * @param key A key associated with the value that invoke processor will be applied to.
+     * The key cannot be {@code null}.
      * @param proc Invoke processor.
      * @param args Optional invoke processor arguments.
      * @param <R> Invoke processor result type.
      * @return Future representing pending completion of the operation.
      * @see InvokeProcessor
      */
-    @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(K key, InvokeProcessor<K, V, R> proc,
+    @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
+        @NotNull K key,
+        InvokeProcessor<K, V, R> proc,
         Serializable... args);
 
     /**
@@ -322,13 +353,14 @@ public interface KeyValueView<K, V> {
      *
      * @param <R> Invoke processor result type.
      * @param keys Ordered collection of keys which values associated with should be processed.
+     * The keys cannot be {@code null}.
      * @param proc Invoke processor.
      * @param args Optional invoke processor arguments.
      * @return Results of the processing.
      * @see InvokeProcessor
      */
     <R extends Serializable> Map<K, R> invokeAll(
-        Collection<K> keys,
+        @NotNull Collection<K> keys,
         InvokeProcessor<K, V, R> proc,
         Serializable... args);
 
@@ -337,13 +369,14 @@ public interface KeyValueView<K, V> {
      *
      * @param <R> Invoke processor result type.
      * @param keys Ordered collection of keys which values associated with should be processed.
+     * The keys cannot be {@code null}.
      * @param proc Invoke processor.
      * @param args Optional invoke processor arguments.
      * @return Future representing pending completion of the operation.
      * @see InvokeProcessor
      */
     @NotNull <R extends Serializable> CompletableFuture<Map<K, R>> invokeAllAsync(
-        Collection<K> keys,
+        @NotNull Collection<K> keys,
         InvokeProcessor<K, V, R> proc,
         Serializable... args);
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/table/TableView.java b/modules/api/src/main/java/org/apache/ignite/table/TableView.java
index 02ac2a1..6ea9109 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/TableView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/TableView.java
@@ -34,285 +34,321 @@ public interface TableView<R> {
     /**
      * Gets a record with same key columns values as given one from the table.
      *
-     * @param keyRec Record with key columns set.
-     * @return Record with all columns filled from the table.
+     * @param keyRec A record with key columns set.
+     * The record cannot be {@code null}.
+     * @return A record with all columns filled from the table.
      */
-    R get(R keyRec);
+    R get(@NotNull R keyRec);
 
     /**
      * Asynchronously gets a record with same key columns values as given one from the table.
      *
-     * @param keyRec Record with key columns set.
+     * @param keyRec A record with key columns set.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<R> getAsync(R keyRec);
+    @NotNull CompletableFuture<R> getAsync(@NotNull R keyRec);
 
     /**
      * Get records from the table.
      *
      * @param keyRecs Records with key columns set.
+     * The records cannot be {@code null}.
      * @return Records with all columns filled from the table.
      */
-    Collection<R> getAll(Collection<R> keyRecs);
+    Collection<R> getAll(@NotNull Collection<R> keyRecs);
 
     /**
      * Asynchronously get records from the table.
      *
      * @param keyRecs Records with key columns set.
+     * The records cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Collection<R>> getAllAsync(Collection<R> keyRecs);
+    @NotNull CompletableFuture<Collection<R>> getAllAsync(@NotNull Collection<R> keyRecs);
 
     /**
      * Inserts a record into the table if does not exist or replaces the existed one.
      *
-     * @param rec Record to insert into the table.
+     * @param rec A record to insert into the table.
+     * The record cannot be {@code null}.
      */
-    void upsert(R rec);
+    void upsert(@NotNull R rec);
 
     /**
      * Asynchronously inserts a record into the table if does not exist or replaces the existed one.
      *
-     * @param rec Record to insert into the table.
+     * @param rec A record to insert into the table.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Void> upsertAsync(R rec);
+    @NotNull CompletableFuture<Void> upsertAsync(@NotNull R rec);
 
     /**
      * Insert records into the table if does not exist or replaces the existed one.
      *
      * @param recs Records to insert into the table.
+     * The records cannot be {@code null}.
      */
-    void upsertAll(Collection<R> recs);
+    void upsertAll(@NotNull Collection<R> recs);
 
     /**
      * Asynchronously inserts a record into the table if does not exist or replaces the existed one.
      *
      * @param recs Records to insert into the table.
+     * The records cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Void> upsertAllAsync(Collection<R> recs);
+    @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<R> recs);
 
     /**
      * Inserts a record into the table or replaces if exists and return replaced previous record.
      *
-     * @param rec Record to insert into the table.
+     * @param rec A record to insert into the table.
+     * The record cannot be {@code null}.
      * @return Replaced record or {@code null} if not existed.
      */
-    R getAndUpsert(R rec);
+    R getAndUpsert(@NotNull R rec);
 
     /**
      * Asynchronously inserts a record into the table or replaces if exists and return replaced previous record.
      *
-     * @param rec Record to insert into the table.
+     * @param rec A record to insert into the table.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<R> getAndUpsertAsync(R rec);
+    @NotNull CompletableFuture<R> getAndUpsertAsync(@NotNull R rec);
 
     /**
      * Inserts a record into the table if not exists.
      *
-     * @param rec Record to insert into the table.
+     * @param rec A record to insert into the table.
+     * The record cannot be {@code null}.
      * @return {@code True} if successful, {@code false} otherwise.
      */
-    boolean insert(R rec);
+    boolean insert(@NotNull R rec);
 
     /**
      * Asynchronously inserts a record into the table if not exists.
      *
-     * @param rec Record to insert into the table.
+     * @param rec A record to insert into the table.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> insertAsync(R rec);
+    @NotNull CompletableFuture<Boolean> insertAsync(@NotNull R rec);
 
     /**
      * Insert records into the table which do not exist, skipping existed ones.
      *
      * @param recs Records to insert into the table.
+     * The records cannot be {@code null}.
      * @return Skipped records.
      */
-    Collection<R> insertAll(Collection<R> recs);
+    Collection<R> insertAll(@NotNull Collection<R> recs);
 
     /**
      * Asynchronously insert records into the table which do not exist, skipping existed ones.
      *
      * @param recs Records to insert into the table.
+     * The records cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Collection<R>> insertAllAsync(Collection<R> recs);
+    @NotNull CompletableFuture<Collection<R>> insertAllAsync(@NotNull Collection<R> recs);
 
     /**
      * Replaces an existed record associated with the same key columns values as the given one has.
      *
-     * @param rec Record to replace with.
+     * @param rec A record to replace with.
+     * The record cannot be {@code null}.
      * @return {@code True} if old record was found and replaced successfully, {@code false} otherwise.
      */
-    boolean replace(R rec);
+    boolean replace(@NotNull R rec);
 
     /**
      * Asynchronously replaces an existed record associated with the same key columns values as the given one has.
      *
-     * @param rec Record to replace with.
+     * @param rec A record to replace with.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> replaceAsync(R rec);
+    @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R rec);
 
     /**
      * Replaces an expected record in the table with the given new one.
      *
-     * @param oldRec Record to replace.
-     * @param newRec Record to replace with.
+     * @param oldRec A record to replace.
+     * The record cannot be {@code null}.
+     * @param newRec A record to replace with.
+     * The record cannot be {@code null}.
      * @return {@code True} if the old record replaced successfully, {@code false} otherwise.
      */
-    boolean replace(R oldRec, R newRec);
+    boolean replace(@NotNull R oldRec, @NotNull R newRec);
 
     /**
      * Asynchronously replaces an expected record in the table with the given new one.
      *
-     * @param oldRec Record to replace.
-     * @param newRec Record to replace with.
+     * @param oldRec A record to replace.
+     * The record cannot be {@code null}.
+     * @param newRec A record to replace with.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> replaceAsync(R oldRec, R newRec);
+    @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R oldRec, @NotNull R newRec);
 
     /**
      * Gets an existed record associated with the same key columns values as the given one has,
      * then replaces with the given one.
      *
-     * @param rec Record to replace with.
+     * @param rec A record to replace with.
+     * The record cannot be {@code null}.
      * @return Replaced record or {@code null} if not existed.
      */
-    R getAndReplace(R rec);
+    R getAndReplace(@NotNull R rec);
 
     /**
      * Asynchronously gets an existed record associated with the same key columns values as the given one has,
      * then replaces with the given one.
      *
-     * @param rec Record to replace with.
+     * @param rec A record to replace with.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<R> getAndReplaceAsync(R rec);
+    @NotNull CompletableFuture<R> getAndReplaceAsync(@NotNull R rec);
 
     /**
      * Deletes a record with the same key columns values as the given one from the table.
      *
-     * @param keyRec Record with key columns set.
+     * @param keyRec A record with key columns set.
+     * The record cannot be {@code null}.
      * @return {@code True} if removed successfully, {@code false} otherwise.
      */
-    boolean delete(R keyRec);
+    boolean delete(@NotNull R keyRec);
 
     /**
      * Asynchronously deletes a record with the same key columns values as the given one from the table.
      *
-     * @param keyRec Record with key columns set.
+     * @param keyRec A record with key columns set.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> deleteAsync(R keyRec);
+    @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull R keyRec);
 
     /**
      * Deletes the given record from the table.
      *
-     * @param rec Record to delete.
+     * @param rec A record to delete.
+     * The record cannot be {@code null}.
      * @return {@code True} if removed successfully, {@code false} otherwise.
      */
-    boolean deleteExact(R rec);
+    boolean deleteExact(@NotNull R rec);
 
     /**
      * Asynchronously deletes given record from the table.
      *
-     * @param rec Record to delete.
+     * @param rec A record to delete.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> deleteExactAsync(R rec);
+    @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull R rec);
 
     /**
      * Gets then deletes a record with the same key columns values from the table.
      *
-     * @param rec Record with key columns set.
+     * @param rec A record with key columns set.
+     * The record cannot be {@code null}.
      * @return Removed record or {@code null} if not existed.
      */
-    R getAndDelete(R rec);
+    R getAndDelete(@NotNull R rec);
 
     /**
      * Asynchronously gets then deletes a record with the same key columns values from the table.
      *
-     * @param rec Record with key columns set.
+     * @param rec A record with key columns set.
+     * The record cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<R> getAndDeleteAsync(R rec);
+    @NotNull CompletableFuture<R> getAndDeleteAsync(@NotNull R rec);
 
     /**
      * Remove records with the same key columns values as the given one has from the table.
      *
      * @param recs Records with key columns set.
+     * The records cannot be {@code null}.
      * @return Records with key columns set that were not exists.
      */
-    Collection<R> deleteAll(Collection<R> recs);
+    Collection<R> deleteAll(@NotNull Collection<R> recs);
 
     /**
      * Asynchronously remove records with the same key columns values as the given one has from the table.
      *
      * @param recs Records with key columns set.
+     * The records cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Collection<R>> deleteAllAsync(Collection<R> recs);
+    @NotNull CompletableFuture<Collection<R>> deleteAllAsync(@NotNull Collection<R> recs);
 
     /**
      * Remove given records from the table.
      *
      * @param recs Records to delete.
+     * The records cannot be {@code null}.
      * @return Records that were not deleted.
      */
-    Collection<R> deleteAllExact(Collection<R> recs);
+    Collection<R> deleteAllExact(@NotNull Collection<R> recs);
 
     /**
      * Asynchronously remove given records from the table.
      *
      * @param recs Records to delete.
+     * The records cannot be {@code null}.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs);
+    @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(@NotNull Collection<R> recs);
 
     /**
      * Executes an InvokeProcessor code against a record with the same key columns values as the given one has.
      *
-     * @param keyRec Record with key columns set.
+     * @param keyRec A record with key columns set.
+     * The record cannot be {@code null}.
      * @param proc Invoke processor.
      * @param <T> InvokeProcessor result type.
      * @return Results of the processing.
      */
-    <T extends Serializable> T invoke(R keyRec, InvokeProcessor<R, R, T> proc);
+    <T extends Serializable> T invoke(@NotNull R keyRec, InvokeProcessor<R, R, T> proc);
 
     /**
      * Asynchronously executes an InvokeProcessor code against a record
      * with the same key columns values as the given one has.
      *
-     * @param keyRec Record with key columns set.
+     * @param keyRec A record with key columns set.
+     * The record cannot be {@code null}.
      * @param proc Invoke processor.
      * @param <T> InvokeProcessor result type.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(R keyRec, InvokeProcessor<R, R, T> proc);
+    @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(@NotNull R keyRec, InvokeProcessor<R, R, T> proc);
 
     /**
      * Executes an InvokeProcessor code against records with the same key columns values as the given ones has.
      *
      * @param keyRecs Records with key columns set.
+     * The records cannot be {@code null}.
      * @param proc Invoke processor.
      * @param <T> InvokeProcessor result type.
      * @return Results of the processing.
      */
-    <T extends Serializable> Map<R, T> invokeAll(Collection<R> keyRecs, InvokeProcessor<R, R, T> proc);
+    <T extends Serializable> Map<R, T> invokeAll(@NotNull Collection<R> keyRecs, InvokeProcessor<R, R, T> proc);
 
     /**
      * Asynchronously executes an InvokeProcessor against records with the same key columns values as the given ones
      * has.
      *
      * @param keyRecs Records with key columns set.
+     * The records cannot be {@code null}.
      * @param proc Invoke processor.
      * @param <T> InvokeProcessor result type.
      * @return Results of the processing.
      */
-    @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(Collection<R> keyRecs,
+    @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(@NotNull Collection<R> keyRecs,
         InvokeProcessor<R, R, T> proc);
 }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 593ac59..72ac0ce 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.distributed;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -36,7 +38,7 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertCommand;
-import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.lang.IgniteLogger;
@@ -59,6 +61,7 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
 import org.apache.ignite.raft.server.RaftServer;
 import org.apache.ignite.raft.server.impl.RaftServerImpl;
+import org.apache.ignite.table.KeyValueBinaryView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.NotNull;
@@ -112,6 +115,9 @@ public class ITDistributedTableTest {
     /** Cluster. */
     private ArrayList<ClusterService> cluster = new ArrayList<>();
 
+    /**
+     * Start all cluster nodes before each test.
+     */
     @BeforeEach
     public void beforeTest() {
         for (int i = 0; i < NODES; i++) {
@@ -138,6 +144,11 @@ public class ITDistributedTableTest {
         LOG.info("Client started.");
     }
 
+    /**
+     * Shutdowns all cluster nodes after each test.
+     *
+     * @throws Exception If failed.
+     */
     @AfterEach
     public void afterTest() throws Exception {
         for (ClusterService node : cluster) {
@@ -147,6 +158,11 @@ public class ITDistributedTableTest {
         client.shutdown();
     }
 
+    /**
+     * Tests partition listener.
+     *
+     * @throws Exception If failed.
+     */
     @Test
     public void partitionListener() throws Exception {
         String grpId = "part";
@@ -172,7 +188,7 @@ public class ITDistributedTableTest {
 //        Row keyChunk = new Row(SCHEMA, new ByteBufferRow(testRow.keySlice()));
         Row keyChunk = getTestKey();
 
-        CompletableFuture<KVGetResponse> getFut = partRaftGrp.run(new GetCommand(keyChunk));
+        CompletableFuture<SingleRowResponse> getFut = partRaftGrp.run(new GetCommand(keyChunk));
 
         assertNotNull(getFut.get().getValue());
 
@@ -181,6 +197,11 @@ public class ITDistributedTableTest {
         partSrv.shutdown();
     }
 
+    /**
+     * Prepares a test row which contains one field.
+     *
+     * @return Row.
+     */
     @NotNull private Row getTestKey() {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
 
@@ -189,6 +210,11 @@ public class ITDistributedTableTest {
         return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
     }
 
+    /**
+     * Prepares a test row which contains two fields.
+     *
+     * @return Row.
+     */
     @NotNull private Row getTestRow() {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
 
@@ -198,6 +224,9 @@ public class ITDistributedTableTest {
         return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
     }
 
+    /**
+     * The test prepares a distributed table and checks operation over various views.
+     */
     @Test
     public void partitionedTable() {
         HashMap<ClusterNode, RaftServer> raftServers = new HashMap<>(NODES);
@@ -250,71 +279,232 @@ public class ITDistributedTableTest {
             }
         });
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().putIfAbsent(
-                tbl.kvView().tupleBuilder()
+        partitionedTableView(tbl, PARTS * 10);
+
+        partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+    }
+
+    /**
+     * Checks operation over row table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableView(Table view, int keysCnt) {
+        LOG.info("Test for Table view [keys=" + keysCnt + ']');
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.insert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build()
+            );
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.upsert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 5))
+                .build()
+            );
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
+        }
+
+        HashSet<Tuple> keys = new HashSet<>();
+
+        for (int i = 0; i < keysCnt; i++) {
+            keys.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+        }
+
+        Collection<Tuple> entries = view.getAll(keys);
+
+        assertEquals(keysCnt, entries.size());
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.replace(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
+                    .set("value", Long.valueOf(i + 5))
                     .build(),
-                tbl.kvView().tupleBuilder()
+                view.tupleBuilder()
+                    .set("key", Long.valueOf(i))
                     .set("value", Long.valueOf(i + 2))
                     .build());
+
+            assertTrue(res);
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.delete(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertTrue(res);
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertNull(entry);
         }
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            Tuple entry = tbl.kvView().get(
-                tbl.kvView().tupleBuilder()
+        ArrayList<Tuple> batch = new ArrayList<>(keysCnt);
+
+        for (int i = 0; i < keysCnt; i++) {
+            batch.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build());
+        }
+
+        view.upsertAll(batch);
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        view.deleteAll(keys);
+
+        for (Tuple key : keys) {
+            Tuple entry = view.get(key);
+
+            assertNull(entry);
+        }
+    }
+
+    /**
+     * Checks operation over key-value binary table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableKVBinaryView(KeyValueBinaryView view, int keysCnt) {
+        LOG.info("Tes for Key-Value binary view [keys=" + keysCnt + ']');
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.putIfAbsent(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
+                    .build(),
+                view.tupleBuilder()
+                    .set("value", Long.valueOf(i + 2))
                     .build());
+        }
 
-            LOG.info("The result is [key=" + i + ", tuple=" + entry + ']');
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(
+                view.tupleBuilder()
+                    .set("key", Long.valueOf(i))
+                    .build());
 
             assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
         }
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().put(
-                tbl.kvView().tupleBuilder()
+        for (int i = 0; i < keysCnt; i++) {
+            view.put(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
                     .build(),
-                tbl.kvView().tupleBuilder()
+                view.tupleBuilder()
                     .set("value", Long.valueOf(i + 5))
                     .build());
 
-            Tuple entry = tbl.kvView().get(
-                tbl.kvView().tupleBuilder()
+            Tuple entry = view.get(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
                     .build());
 
             assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
         }
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            boolean res = tbl.kvView().replace(
-                tbl.kvView().tupleBuilder()
+        HashSet<Tuple> keys = new HashSet<>();
+
+        for (int i = 0; i < keysCnt; i++) {
+            keys.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+        }
+
+        Map<Tuple, Tuple> entries = view.getAll(keys);
+
+        assertEquals(keysCnt, entries.size());
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.replace(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
                     .build(),
-                tbl.kvView().tupleBuilder()
+                view.tupleBuilder()
                     .set("value", Long.valueOf(i + 5))
                     .build(),
-                tbl.kvView().tupleBuilder()
+                view.tupleBuilder()
                     .set("value", Long.valueOf(i + 2))
                     .build());
 
             assertTrue(res);
         }
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            boolean res = tbl.kvView().remove(
-                tbl.kvView().tupleBuilder()
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.remove(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
                     .build());
 
             assertTrue(res);
 
-            Tuple entry = tbl.kvView().get(
-                tbl.kvView().tupleBuilder()
+            Tuple entry = view.get(
+                view.tupleBuilder()
+                    .set("key", Long.valueOf(i))
+                    .build());
+
+            assertNull(entry);
+        }
+
+        HashMap<Tuple, Tuple> batch = new HashMap<>(keysCnt);
+
+        for (int i = 0; i < keysCnt; i++) {
+            batch.put(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
+                    .build(),
+                view.tupleBuilder()
+                    .set("value", Long.valueOf(i + 2))
                     .build());
+        }
+
+        view.putAll(batch);
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        view.removeAll(keys);
+
+        for (Tuple key : keys) {
+            Tuple entry = view.get(key);
 
             assertNull(entry);
         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
index 31a578e..be1c3ac 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -35,9 +36,6 @@ import org.jetbrains.annotations.NotNull;
 
 /**
  * Key-value view implementation for binary user-object representation.
- *
- * @implNote Key-value {@link Tuple}s represents marshalled user-objects
- * regarding the binary object concept.
  */
 public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinaryView {
     /** Marshaller. */
@@ -56,12 +54,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple get(Tuple key) {
+    @Override public Tuple get(@NotNull Tuple key) {
         return sync(getAsync(key));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAsync(Tuple key) {
+    @Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple key) {
         Objects.requireNonNull(key);
 
         Row kRow = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
@@ -72,27 +70,31 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Tuple, Tuple> getAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Map<Tuple, Tuple> getAll(@NotNull Collection<Tuple> keys) {
+        return sync(getAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@NotNull Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.getAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).collect(Collectors.toMap(TableRow::keyChunk, TableRow::valueChunk)));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean contains(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public boolean contains(@NotNull Tuple key) {
+        return get(key) != null;
     }
 
     /** {@inheritDoc} */
-    @Override public void put(Tuple key, Tuple val) {
+    @Override public void put(@NotNull Tuple key, Tuple val) {
         sync(putAsync(key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> putAsync(Tuple key, Tuple val) {
+    @Override public @NotNull CompletableFuture<Void> putAsync(@NotNull Tuple key, Tuple val) {
         Objects.requireNonNull(key);
 
         Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
@@ -101,22 +103,27 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public void putAll(Map<Tuple, Tuple> pairs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public void putAll(@NotNull Map<Tuple, Tuple> pairs) {
+        sync(putAllAsync(pairs));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> putAllAsync(Map<Tuple, Tuple> pairs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Void> putAllAsync(@NotNull Map<Tuple, Tuple> pairs) {
+        Objects.requireNonNull(pairs);
+
+        return tbl.upsertAll(pairs.entrySet()
+            .stream()
+            .map(this::marshalPair)
+            .collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple getAndPut(Tuple key, Tuple val) {
+    @Override public Tuple getAndPut(@NotNull Tuple key, Tuple val) {
         return sync(getAndPutAsync(key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAndPutAsync(Tuple key, Tuple val) {
+    @Override public @NotNull CompletableFuture<Tuple> getAndPutAsync(@NotNull Tuple key, Tuple val) {
         Objects.requireNonNull(key);
 
         Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
@@ -127,14 +134,13 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public boolean putIfAbsent(Tuple key, Tuple val) {
+    @Override public boolean putIfAbsent(@NotNull Tuple key, @NotNull Tuple val) {
         return sync(putIfAbsentAsync(key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(Tuple key, Tuple val) {
+    @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull Tuple key, Tuple val) {
         Objects.requireNonNull(key);
-        Objects.requireNonNull(val);
 
         Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
 
@@ -142,12 +148,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public boolean remove(Tuple key) {
+    @Override public boolean remove(@NotNull Tuple key) {
         return sync(removeAsync(key));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> removeAsync(Tuple key) {
+    @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull Tuple key) {
         Objects.requireNonNull(key);
 
         Row row = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
@@ -156,12 +162,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public boolean remove(Tuple key, Tuple val) {
+    @Override public boolean remove(@NotNull Tuple key, @NotNull Tuple val) {
         return sync(removeAsync(key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> removeAsync(Tuple key, Tuple val) {
+    @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull Tuple key, @NotNull Tuple val) {
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
@@ -171,32 +177,44 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Collection<Tuple> removeAll(@NotNull Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(@NotNull Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(t -> t.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple getAndRemove(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Tuple getAndRemove(@NotNull Tuple key) {
+        Objects.requireNonNull(key);
+
+        return sync(getAndRemoveAsync(key));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(@NotNull Tuple key) {
+        Objects.requireNonNull(key);
+
+        return tbl.getAndDelete(marsh.marshal(key, null))
+            .thenApply(this::wrap)
+            .thenApply(t -> t == null ? null : t.valueChunk());
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(Tuple key, Tuple val) {
+    @Override public boolean replace(@NotNull Tuple key, Tuple val) {
         return sync(replaceAsync(key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple key, Tuple val) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple key, Tuple val) {
         Objects.requireNonNull(key);
 
         Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
@@ -205,12 +223,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(Tuple key, Tuple oldVal, Tuple newVal) {
+    @Override public boolean replace(@NotNull Tuple key, Tuple oldVal, Tuple newVal) {
         return sync(replaceAsync(key, oldVal, newVal));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple key, Tuple oldVal, Tuple newVal) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple key, Tuple oldVal, Tuple newVal) {
         Objects.requireNonNull(key);
 
         Row oldRow = marshaller().marshal(key, oldVal); // Convert to portable format to pass TX/storage layer.
@@ -220,18 +238,22 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple getAndReplace(Tuple key, Tuple val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Tuple getAndReplace(@NotNull Tuple key, Tuple val) {
+        return sync(getAndReplaceAsync(key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(Tuple key, Tuple val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple key, Tuple val) {
+        Objects.requireNonNull(key);
+
+        return tbl.getAndReplace(marsh.marshal(key, val))
+            .thenApply(this::wrap)
+            .thenApply(t -> t == null ? null : t.valueChunk());
     }
 
     /** {@inheritDoc} */
     @Override public <R extends Serializable> R invoke(
-        Tuple key,
+        @NotNull Tuple key,
         InvokeProcessor<Tuple, Tuple, R> proc,
         Serializable... args
     ) {
@@ -240,7 +262,7 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
 
     /** {@inheritDoc} */
     @Override public @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
-        Tuple key,
+        @NotNull Tuple key,
         InvokeProcessor<Tuple, Tuple, R> proc,
         Serializable... args
     ) {
@@ -249,7 +271,7 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
 
     /** {@inheritDoc} */
     @Override public <R extends Serializable> Map<Tuple, R> invokeAll(
-        Collection<Tuple> keys,
+        @NotNull Collection<Tuple> keys,
         InvokeProcessor<Tuple, Tuple, R> proc,
         Serializable... args
     ) {
@@ -258,7 +280,7 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
 
     /** {@inheritDoc} */
     @Override public @NotNull <R extends Serializable> CompletableFuture<Map<Tuple, R>> invokeAllAsync(
-        Collection<Tuple> keys,
+        @NotNull Collection<Tuple> keys,
         InvokeProcessor<Tuple, Tuple, R> proc,
         Serializable... args
     ) {
@@ -289,4 +311,27 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
 
         return new TableRow(schema, new Row(schema, row));
     }
+
+    /**
+     * Marshals a key-value pairs into the table row collection.
+     *
+     * @param rows Binary rows.
+     * @return Table rows.
+     */
+    private Collection<TableRow> wrap(Collection<BinaryRow> rows) {
+        if (rows == null)
+            return null;
+
+        return rows.stream().map(this::wrap).collect(Collectors.toSet());
+    }
+
+    /**
+     * Marshals a key-value pair into the table row.
+     *
+     * @param pair A map entry represents the key-value pair.
+     * @return Row.
+     */
+    private Row marshalPair(Map.Entry<Tuple, Tuple> pair) {
+        return marshaller().marshal(pair.getKey(), pair.getValue());
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
index 6d3d10b..c744e93 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
@@ -51,12 +51,12 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
     }
 
     /** {@inheritDoc} */
-    @Override public V get(K key) {
+    @Override public V get(@NotNull K key) {
         return sync(getAsync(key));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<V> getAsync(K key) {
+    @Override public @NotNull CompletableFuture<V> getAsync(@NotNull K key) {
         Objects.requireNonNull(key);
 
         final KVSerializer<K, V> marsh = marshaller();
@@ -69,138 +69,138 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
     }
 
     /** {@inheritDoc} */
-    @Override public Map<K, V> getAll(Collection<K> keys) {
+    @Override public Map<K, V> getAll(@NotNull Collection<K> keys) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+    @Override public @NotNull CompletableFuture<Map<K, V>> getAllAsync(@NotNull Collection<K> keys) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean contains(K key) {
+    @Override public boolean contains(@NotNull K key) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public void put(K key, V val) {
+    @Override public void put(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> putAsync(K key, V val) {
+    @Override public @NotNull CompletableFuture<Void> putAsync(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public void putAll(Map<K, V> pairs) {
+    @Override public void putAll(@NotNull Map<K, V> pairs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> putAllAsync(Map<K, V> pairs) {
+    @Override public @NotNull CompletableFuture<Void> putAllAsync(@NotNull Map<K, V> pairs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public V getAndPut(K key, V val) {
+    @Override public V getAndPut(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<V> getAndPutAsync(K key, V val) {
+    @Override public @NotNull CompletableFuture<V> getAndPutAsync(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean putIfAbsent(K key, V val) {
+    @Override public boolean putIfAbsent(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(K key, V val) {
+    @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean remove(K key) {
+    @Override public boolean remove(@NotNull K key) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> removeAsync(K key) {
+    @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean remove(K key, V val) {
+    @Override public boolean remove(@NotNull K key, @NotNull V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> removeAsync(K key, V val) {
+    @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key, @NotNull V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<K> removeAll(Collection<K> keys) {
+    @Override public Collection<K> removeAll(@NotNull Collection<K> keys) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<K> removeAllAsync(Collection<K> keys) {
+    @Override public @NotNull CompletableFuture<Collection<K>> removeAllAsync(@NotNull Collection<K> keys) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public V getAndRemove(K key) {
+    @Override public V getAndRemove(@NotNull K key) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<V> getAndRemoveAsync(K key) {
+    @Override public @NotNull CompletableFuture<V> getAndRemoveAsync(@NotNull K key) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(K key, V val) {
+    @Override public boolean replace(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(K key, V val) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(K key, V oldVal, V newVal) {
+    @Override public boolean replace(@NotNull K key, V oldVal, V newVal) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V oldVal, V newVal) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public V getAndReplace(K key, V val) {
+    @Override public V getAndReplace(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<V> getAndReplaceAsync(K key, V val) {
+    @Override public @NotNull CompletableFuture<V> getAndReplaceAsync(@NotNull K key, V val) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public <R extends Serializable> R invoke(K key, InvokeProcessor<K, V, R> proc, Serializable... args) {
+    @Override public <R extends Serializable> R invoke(@NotNull K key, InvokeProcessor<K, V, R> proc, Serializable... args) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
-        K key,
+        @NotNull K key,
         InvokeProcessor<K, V, R> proc,
         Serializable... args
     ) {
@@ -209,7 +209,7 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
 
     /** {@inheritDoc} */
     @Override public <R extends Serializable> Map<K, R> invokeAll(
-        Collection<K> keys,
+        @NotNull Collection<K> keys,
         InvokeProcessor<K, V, R> proc,
         Serializable... args
     ) {
@@ -218,7 +218,7 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
 
     /** {@inheritDoc} */
     @Override public @NotNull <R extends Serializable> CompletableFuture<Map<K, R>> invokeAllAsync(
-        Collection<K> keys,
+        @NotNull Collection<K> keys,
         InvokeProcessor<K, V, R> proc, Serializable... args
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index f442bbb..27bdc44 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -66,12 +66,12 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
     }
 
     /** {@inheritDoc} */
-    @Override public R get(R keyRec) {
+    @Override public R get(@NotNull R keyRec) {
         return sync(getAsync(keyRec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<R> getAsync(R keyRec) {
+    @Override public @NotNull CompletableFuture<R> getAsync(@NotNull R keyRec) {
         Objects.requireNonNull(keyRec);
 
         RecordSerializer<R> marsh = serializer();
@@ -84,159 +84,161 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<R> getAll(Collection<R> keyRecs) {
+    @Override public Collection<R> getAll(@NotNull Collection<R> keyRecs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Collection<R>> getAllAsync(Collection<R> keyRecs) {
+    @Override public @NotNull CompletableFuture<Collection<R>> getAllAsync(@NotNull Collection<R> keyRecs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public void upsert(R rec) {
+    @Override public void upsert(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> upsertAsync(R rec) {
+    @Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public void upsertAll(Collection<R> recs) {
+    @Override public void upsertAll(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> upsertAllAsync(Collection<R> recs) {
+    @Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public R getAndUpsert(R rec) {
+    @Override public R getAndUpsert(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<R> getAndUpsertAsync(R rec) {
+    @Override public @NotNull CompletableFuture<R> getAndUpsertAsync(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean insert(R rec) {
+    @Override public boolean insert(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> insertAsync(R rec) {
+    @Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<R> insertAll(Collection<R> recs) {
+    @Override public Collection<R> insertAll(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Collection<R>> insertAllAsync(Collection<R> recs) {
+    @Override public @NotNull CompletableFuture<Collection<R>> insertAllAsync(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(R rec) {
+    @Override public boolean replace(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(R rec) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(R oldRec, R newRec) {
+    @Override public boolean replace(@NotNull R oldRec, @NotNull R newRec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(R oldRec, R newRec) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R oldRec, @NotNull R newRec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public R getAndReplace(R rec) {
+    @Override public R getAndReplace(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<R> getAndReplaceAsync(R rec) {
+    @Override public @NotNull CompletableFuture<R> getAndReplaceAsync(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean delete(R keyRec) {
+    @Override public boolean delete(@NotNull R keyRec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> deleteAsync(R keyRec) {
+    @Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull R keyRec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean deleteExact(R rec) {
+    @Override public boolean deleteExact(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(R rec) {
+    @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public R getAndDelete(R rec) {
+    @Override public R getAndDelete(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<R> getAndDeleteAsync(R rec) {
+    @Override public @NotNull CompletableFuture<R> getAndDeleteAsync(@NotNull R rec) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<R> deleteAll(Collection<R> recs) {
+    @Override public Collection<R> deleteAll(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Collection<R>> deleteAllAsync(Collection<R> recs) {
+    @Override public @NotNull CompletableFuture<Collection<R>> deleteAllAsync(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<R> deleteAllExact(Collection<R> recs) {
+    @Override public Collection<R> deleteAllExact(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs) {
+    @Override public @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(@NotNull Collection<R> recs) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public <T extends Serializable> T invoke(R keyRec, InvokeProcessor<R, R, T> proc) {
+    @Override public <T extends Serializable> T invoke(@NotNull R keyRec, InvokeProcessor<R, R, T> proc) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(R keyRec,
-        InvokeProcessor<R, R, T> proc) {
+    @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(
+        @NotNull R keyRec,
+        InvokeProcessor<R, R, T> proc
+    ) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
     @Override public <T extends Serializable> Map<R, T> invokeAll(
-        Collection<R> keyRecs,
+        @NotNull Collection<R> keyRecs,
         InvokeProcessor<R, R, T> proc
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
@@ -244,7 +246,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
 
     /** {@inheritDoc} */
     @Override public @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(
-        Collection<R> keyRecs,
+        @NotNull Collection<R> keyRecs,
         InvokeProcessor<R, R, T> proc
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 58ba9cc..a7cee35 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.table;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -92,12 +94,12 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple get(Tuple keyRec) {
+    @Override public Tuple get(@NotNull Tuple keyRec) {
         return sync(getAsync(keyRec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAsync(Tuple keyRec) {
+    @Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple keyRec) {
         Objects.requireNonNull(keyRec);
 
         final Row keyRow = marshaller().marshal(keyRec, null); // Convert to portable format to pass TX/storage layer.
@@ -106,22 +108,32 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Tuple> getAll(Collection<Tuple> keyRecs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Collection<Tuple> getAll(@NotNull Collection<Tuple> keyRecs) {
+        return sync(getAllAsync(keyRecs));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(Collection<Tuple> keyRecs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@NotNull Collection<Tuple> keyRecs) {
+        Objects.requireNonNull(keyRecs);
+
+        HashSet<BinaryRow> keys = new HashSet<>(keyRecs.size());
+
+        for (Tuple keyRec : keyRecs) {
+            final Row keyRow = marshaller().marshal(keyRec, null);
+
+            keys.add(keyRow);
+        }
+
+        return tbl.getAll(keys).thenApply(this::wrap);
     }
 
     /** {@inheritDoc} */
-    @Override public void upsert(Tuple rec) {
+    @Override public void upsert(@NotNull Tuple rec) {
         sync(upsertAsync(rec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> upsertAsync(Tuple rec) {
+    @Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull Tuple rec) {
         Objects.requireNonNull(rec);
 
         final Row keyRow = marshaller().marshal(rec);
@@ -130,22 +142,32 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public void upsertAll(Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public void upsertAll(@NotNull Collection<Tuple> recs) {
+        sync(upsertAllAsync(recs));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Void> upsertAllAsync(Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<Tuple> recs) {
+        Objects.requireNonNull(recs);
+
+        HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+        for (Tuple keyRec : recs) {
+            final Row keyRow = marshaller().marshal(keyRec);
+
+            keys.add(keyRow);
+        }
+
+        return tbl.upsertAll(keys);
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple getAndUpsert(Tuple rec) {
+    @Override public Tuple getAndUpsert(@NotNull Tuple rec) {
         return sync(getAndUpsertAsync(rec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(Tuple rec) {
+    @Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(@NotNull Tuple rec) {
         Objects.requireNonNull(rec);
 
         final Row keyRow = marshaller().marshal(rec);
@@ -154,12 +176,12 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean insert(Tuple rec) {
+    @Override public boolean insert(@NotNull Tuple rec) {
         return sync(insertAsync(rec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> insertAsync(Tuple rec) {
+    @Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull Tuple rec) {
         Objects.requireNonNull(rec);
 
         final Row keyRow = marshaller().marshal(rec);
@@ -168,22 +190,32 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Tuple> insertAll(Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Collection<Tuple> insertAll(@NotNull Collection<Tuple> recs) {
+        return sync(insertAllAsync(recs));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(@NotNull Collection<Tuple> recs) {
+        Objects.requireNonNull(recs);
+
+        HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+        for (Tuple keyRec : recs) {
+            final Row keyRow = marshaller().marshal(keyRec);
+
+            keys.add(keyRow);
+        }
+
+        return tbl.insertAll(keys).thenApply(this::wrap);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(Tuple rec) {
+    @Override public boolean replace(@NotNull Tuple rec) {
         return sync(replaceAsync(rec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple rec) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple rec) {
         Objects.requireNonNull(rec);
 
         final Row keyRow = marshaller().marshal(rec);
@@ -192,12 +224,12 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(Tuple oldRec, Tuple newRec) {
+    @Override public boolean replace(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
         return sync(replaceAsync(oldRec, newRec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple oldRec, Tuple newRec) {
+    @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
         Objects.requireNonNull(oldRec);
         Objects.requireNonNull(newRec);
 
@@ -208,22 +240,26 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple getAndReplace(Tuple rec) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Tuple getAndReplace(@NotNull Tuple rec) {
+        return sync(getAndReplaceAsync(rec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(Tuple rec) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple rec) {
+        Objects.requireNonNull(rec);
+
+        final Row keyRow = marshaller().marshal(rec);
+
+        return tbl.getAndReplace(keyRow).thenApply(this::wrap);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean delete(Tuple keyRec) {
+    @Override public boolean delete(@NotNull Tuple keyRec) {
         return sync(deleteAsync(keyRec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> deleteAsync(Tuple keyRec) {
+    @Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull Tuple keyRec) {
         Objects.requireNonNull(keyRec);
 
         final Row keyRow = marshaller().marshal(keyRec, null);
@@ -232,12 +268,12 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean deleteExact(Tuple rec) {
+    @Override public boolean deleteExact(@NotNull Tuple rec) {
         return sync(deleteExactAsync(rec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(Tuple rec) {
+    @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull Tuple rec) {
         Objects.requireNonNull(rec);
 
         final Row row = marshaller().marshal(rec);
@@ -246,39 +282,64 @@ public class TableImpl extends AbstractTableView implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public Tuple getAndDelete(Tuple rec) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Tuple getAndDelete(@NotNull Tuple rec) {
+        return sync(getAndDeleteAsync(rec));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(Tuple rec) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(@NotNull Tuple rec) {
+        Objects.requireNonNull(rec);
+
+        final Row row = marshaller().marshal(rec);
+
+        return tbl.getAndDelete(row).thenApply(this::wrap);
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Tuple> deleteAll(Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Collection<Tuple> deleteAll(@NotNull Collection<Tuple> recs) {
+        return sync(deleteAllAsync(recs));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(@NotNull Collection<Tuple> recs) {
+        Objects.requireNonNull(recs);
+
+        HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+        for (Tuple keyRec : recs) {
+            final Row keyRow = marshaller().marshal(keyRec, null);
+
+            keys.add(keyRow);
+        }
+
+        return tbl.deleteAll(keys).thenApply(this::wrap);
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Tuple> deleteAllExact(Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public Collection<Tuple> deleteAllExact(@NotNull Collection<Tuple> recs) {
+        return sync(deleteAllExactAsync(recs));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(
-        Collection<Tuple> recs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        @NotNull Collection<Tuple> recs
+    ) {
+        Objects.requireNonNull(recs);
+
+        HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+        for (Tuple keyRec : recs) {
+            final Row keyRow = marshaller().marshal(keyRec);
+
+            keys.add(keyRow);
+        }
+
+        return tbl.deleteAllExact(keys).thenApply(this::wrap);
     }
 
     /** {@inheritDoc} */
     @Override public <T extends Serializable> T invoke(
-        Tuple keyRec,
+        @NotNull Tuple keyRec,
         InvokeProcessor<Tuple, Tuple, T> proc
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
@@ -286,7 +347,7 @@ public class TableImpl extends AbstractTableView implements Table {
 
     /** {@inheritDoc} */
     @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(
-        Tuple keyRec,
+        @NotNull Tuple keyRec,
         InvokeProcessor<Tuple, Tuple, T> proc
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
@@ -294,7 +355,7 @@ public class TableImpl extends AbstractTableView implements Table {
 
     /** {@inheritDoc} */
     @Override public <T extends Serializable> Map<Tuple, T> invokeAll(
-        Collection<Tuple> keyRecs,
+        @NotNull Collection<Tuple> keyRecs,
         InvokeProcessor<Tuple, Tuple, T> proc
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
@@ -302,7 +363,7 @@ public class TableImpl extends AbstractTableView implements Table {
 
     /** {@inheritDoc} */
     @Override public @NotNull <T extends Serializable> CompletableFuture<Map<Tuple, T>> invokeAllAsync(
-        Collection<Tuple> keyRecs,
+        @NotNull Collection<Tuple> keyRecs,
         InvokeProcessor<Tuple, Tuple, T> proc
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
@@ -332,4 +393,15 @@ public class TableImpl extends AbstractTableView implements Table {
 
         return new TableRow(schema, new Row(schema, row));
     }
+
+    /**
+     * @param rows Binary rows.
+     * @return Table rows.
+     */
+    private Collection<Tuple> wrap(Collection<BinaryRow> rows) {
+        if (rows == null)
+            return null;
+
+        return rows.stream().map(this::wrap).collect(Collectors.toSet());
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
new file mode 100644
index 0000000..054df56
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is
+ * implemented into the network layer.
+ * TODO: Remove it after (IGNITE-14793)
+ */
+public class CommandUtils {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(CommandUtils.class);
+
+    /**
+     * Writes a list of rows to byte array.
+     *
+     * @param rows Collection of rows.
+     * @param consumer Byte array consumer.
+     */
+    public static void rowsToBytes(Collection<BinaryRow> rows, Consumer<byte[]> consumer) {
+        if (rows == null || rows.isEmpty())
+            return;
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            for (BinaryRow row : rows) {
+                rowToBytes(row, bytes -> {
+                    try {
+                        baos.write(intToBytes(bytes.length));
+
+                        baos.write(bytes);
+                    }
+                    catch (IOException e) {
+                        LOG.error("Could not write row to stream [row=" + row + ']', e);
+                    }
+
+                });
+            }
+
+            baos.flush();
+
+            consumer.accept(baos.toByteArray());
+        }
+        catch (IOException e) {
+            LOG.error("Could not write rows to stream [rows=" + rows.size() + ']', e);
+
+            consumer.accept(null);
+        }
+    }
+
+    /**
+     * Writes a row to byte array.
+     *
+     * @param row Row.
+     * @param consumer Byte array consumer.
+     */
+    public static void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+        if (row == null)
+            return;
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            row.writeTo(baos);
+
+            baos.flush();
+
+            consumer.accept(baos.toByteArray());
+        }
+        catch (IOException e) {
+            LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+            consumer.accept(null);
+        }
+    }
+
+    /**
+     * Reads the keys from a byte array.
+     *
+     * @param bytes Byte array.
+     * @param consumer Consumer for binary row.
+     */
+    public static void readRows(byte[] bytes, Consumer<BinaryRow> consumer) {
+        if (bytes == null || bytes.length == 0)
+            return;
+
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+            byte[] lenBytes = new byte[4];
+
+            byte[] rowBytes;
+
+            int read;
+
+            while ((read = bais.read(lenBytes)) != -1) {
+                assert read == 4;
+
+                int len = bytesToInt(lenBytes);
+
+                assert len > 0;
+
+                rowBytes = new byte[len];
+
+                read = bais.read(rowBytes);
+
+                assert read == len;
+
+                consumer.accept(new ByteBufferRow(rowBytes));
+            }
+        }
+        catch (IOException e) {
+            LOG.error("Could not read rows from stream.", e);
+        }
+    }
+
+    /**
+     * Serializes an integer to the byte array.
+     *
+     * @param i Integer value.
+     * @return Byte array.
+     */
+    private static byte[] intToBytes(int i) {
+        byte[] arr = new byte[4];
+        ByteBuffer.wrap(arr).putInt(i);
+        return arr;
+    }
+
+    /**
+     * Deserializes a byte array to the integer.
+     *
+     * @param bytes Byte array.
+     * @return Integer value.
+     */
+    private static int bytesToInt(byte[] bytes) {
+        return ByteBuffer.wrap(bytes).getInt();
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
new file mode 100644
index 0000000..167b58d
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command deletes entries by the passed keys.
+ */
+public class DeleteAllCommand implements WriteCommand {
+    /** Binary rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * Creates a new instance of DeleteAllCommand with the given set of keys to be deleted.
+     * The {@code keyRows} should not be {@code null} or empty.
+     *
+     * @param keyRows Collection of binary row keys to be deleted.
+     */
+    public DeleteAllCommand(@NotNull Set<BinaryRow> keyRows) {
+        assert keyRows != null && !keyRows.isEmpty();
+
+        this.rows = keyRows;
+
+        CommandUtils.rowsToBytes(keyRows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * Returns a set of binary key rows to be deleted.
+     *
+     * @return Binary keys.
+     */
+    public Set<BinaryRow> getRows() {
+        if (rows == null && rowsBytes != null) {
+            rows = new HashSet<>();
+
+            CommandUtils.readRows(rowsBytes, rows::add);
+        }
+
+        return rows;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
index eabe187..f035eb8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
@@ -17,12 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
  * The command deletes a entry by passed key.
  */
 public class DeleteCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(DeleteCommand.class);
-
-    /** Key row. */
+    /** Binary key row. */
     private transient BinaryRow keyRow;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] keyRowBytes;
 
     /**
-     * @param keyRow Key row.
+     * Creates a new instance of DeleteCommand with the given key to be deleted.
+     * The {@code keyRow} should not be {@code null}.
+     *
+     * @param keyRow Binary key row.
      */
     public DeleteCommand(@NotNull BinaryRow keyRow) {
         assert keyRow != null;
 
         this.keyRow = keyRow;
 
-        rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
     }
 
     /**
-     * Gets a key row.
+     * Gets a binary key row to be deleted.
      *
-     * @return Key row.
+     * @return Binary key.
      */
     public BinaryRow getKeyRow() {
         if (keyRow == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
new file mode 100644
index 0000000..65846a1
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command deletes entries that exact the same as the rows passed.
+ */
+public class DeleteExactAllCommand implements WriteCommand {
+    /** Binary rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * Creates a new instance of DeleteExactAllCommand with the given set of rows to be deleted.
+     * The {@code rows} should not be {@code null} or empty.
+     *
+     * @param rows Binary rows.
+     */
+    public DeleteExactAllCommand(@NotNull Set<BinaryRow> rows) {
+        assert rows != null && !rows.isEmpty();
+
+        this.rows = rows;
+
+        CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * Gets a set of binary rows to be deleted.
+     *
+     * @return Binary rows.
+     */
+    public Set<BinaryRow> getRows() {
+        if (rows == null && rowsBytes != null) {
+            rows = new HashSet<>();
+
+            CommandUtils.readRows(rowsBytes, rows::add);
+        }
+
+        return rows;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java
index be6ddcc..ae50136 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java
@@ -17,68 +17,43 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * The command inserts a row.
+ * The command deletes an entry that is exact the same as the row passed.
  */
-public class InsertCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
-    /** Row. */
+public class DeleteExactCommand implements WriteCommand {
+    /** Binary row. */
     private transient BinaryRow row;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
     /**
-     * @param row Row.
+     * Creates a new instance of DeleteExactCommand with the given row to be deleted.
+     * The {@code row} should not be {@code null}.
+     *
+     * @param row Binary row.
      */
-    public InsertCommand(@NotNull BinaryRow row) {
+    public DeleteExactCommand(@NotNull BinaryRow row) {
         assert row != null;
 
         this.row = row;
 
-        rowToBytes(row, bytes -> rowBytes = bytes);
+        CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
     }
 
     /**
-     * Writes a row to byte array.
+     * Gets a binary key row to be got.
      *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
-    }
-
-    /**
-     * Gets a data row.
-     *
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getRow() {
         if (row == null)
@@ -86,4 +61,5 @@ public class InsertCommand implements WriteCommand {
 
         return row;
     }
+
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAllCommand.java
new file mode 100644
index 0000000..72eeddb
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * This is a command for the batch get operation.
+ */
+public class GetAllCommand implements ReadCommand {
+    /** Binary key rows. */
+    private transient Set<BinaryRow> keyRows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] keyRowsBytes;
+
+    /**
+     * Creates a new instance of GetAllCommand with the given keys to be got.
+     * The {@code keyRows} should not be {@code null} or empty.
+     *
+     * @param keyRows Binary key rows.
+     */
+    public GetAllCommand(@NotNull Set<BinaryRow> keyRows) {
+        assert keyRows != null && !keyRows.isEmpty();
+
+        this.keyRows = keyRows;
+
+        CommandUtils.rowsToBytes(keyRows, bytes -> keyRowsBytes = bytes);
+    }
+
+    /**
+     * Gets a set of binary key rows to be got.
+     *
+     * @return Binary keys.
+     */
+    public Set<BinaryRow> getKeyRows() {
+        if (keyRows == null && keyRowsBytes != null) {
+            keyRows = new HashSet<>();
+
+            CommandUtils.readRows(keyRowsBytes, keyRows::add);
+        }
+
+        return keyRows;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndDeleteCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndDeleteCommand.java
index eabe187..bf4e40b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndDeleteCommand.java
@@ -17,68 +17,43 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * The command deletes a entry by passed key.
+ * This is a command to get a value before delete it.
  */
-public class DeleteCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(DeleteCommand.class);
-
-    /** Key row. */
+public class GetAndDeleteCommand implements WriteCommand {
+    /** Binary key row. */
     private transient BinaryRow keyRow;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] keyRowBytes;
 
     /**
-     * @param keyRow Key row.
+     * Creates a new instance of GetAndDeleteCommand with the given key to be got and deleted.
+     * The {@code keyRow} should not be {@code null}.
+     *
+     * @param keyRow Binary key row.
      */
-    public DeleteCommand(@NotNull BinaryRow keyRow) {
+    public GetAndDeleteCommand(@NotNull BinaryRow keyRow) {
         assert keyRow != null;
 
         this.keyRow = keyRow;
 
-        rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
+        CommandUtils.rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
     }
 
     /**
-     * Writes a row to byte array.
+     * Gets a binary key row to be got and deleted.
      *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
-    }
-
-    /**
-     * Gets a key row.
-     *
-     * @return Key row.
+     * @return Binary key.
      */
     public BinaryRow getKeyRow() {
         if (keyRow == null)
@@ -86,5 +61,4 @@ public class DeleteCommand implements WriteCommand {
 
         return keyRow;
     }
-
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndReplaceCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndReplaceCommand.java
index be6ddcc..d70ebe1 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndReplaceCommand.java
@@ -17,68 +17,43 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * The command inserts a row.
+ * This is a command to get a value before replace it.
  */
-public class InsertCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
-    /** Row. */
+public class GetAndReplaceCommand implements WriteCommand {
+    /** Binary row. */
     private transient BinaryRow row;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
     /**
-     * @param row Row.
+     * Creates a new instance of GetAndReplaceCommand with the given row to be got and replaced.
+     * The {@code row} should not be {@code null}.
+     *
+     * @param row Binary row.
      */
-    public InsertCommand(@NotNull BinaryRow row) {
+    public GetAndReplaceCommand(@NotNull BinaryRow row) {
         assert row != null;
 
         this.row = row;
 
-        rowToBytes(row, bytes -> rowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
     }
 
     /**
-     * Gets a data row.
+     * Gets a binary row to be got and replaced.
      *
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getRow() {
         if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndUpsertCommand.java
similarity index 55%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndUpsertCommand.java
index eabe187..55572f6 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndUpsertCommand.java
@@ -17,68 +17,43 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * The command deletes a entry by passed key.
+ * This is a command to get a value before upsert it.
  */
-public class DeleteCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(DeleteCommand.class);
-
-    /** Key row. */
+public class GetAndUpsertCommand implements WriteCommand {
+    /** Binary key row. */
     private transient BinaryRow keyRow;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] keyRowBytes;
 
     /**
-     * @param keyRow Key row.
-     */
-    public DeleteCommand(@NotNull BinaryRow keyRow) {
-        assert keyRow != null;
-
-        this.keyRow = keyRow;
-
-        rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
+     * Creates a new instance of GetAndUpsertCommand with the given row to be got and upserted.
+     * The {@code row} should not be {@code null}.
      *
-     * @param row Row.
-     * @param consumer Byte array consumer.
+     * @param row Binary row.
      */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
+    public GetAndUpsertCommand(@NotNull BinaryRow row) {
+        assert row != null;
 
-            baos.flush();
+        this.keyRow = row;
 
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(row, bytes -> keyRowBytes = bytes);
     }
 
     /**
-     * Gets a key row.
+     * Gets a binary key row to be got and upserted.
      *
-     * @return Key row.
+     * @return Binary key.
      */
     public BinaryRow getKeyRow() {
         if (keyRow == null)
@@ -86,5 +61,4 @@ public class DeleteCommand implements WriteCommand {
 
         return keyRow;
     }
-
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
index 597aba3..286257b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
@@ -17,12 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.ReadCommand;
 import org.jetbrains.annotations.NotNull;
 
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
  * The command gets a value by key specified.
  */
 public class GetCommand implements ReadCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
-    /** Key row. */
+    /** Binary key row. */
     private transient BinaryRow keyRow;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] keyRowBytes;
 
     /**
-     * @param keyRow Key row.
+     * Creates a new instance of GetCommand with the given key to be got.
+     * The {@code keyRow} should not be {@code null}.
+     *
+     * @param keyRow Binary key row.
      */
     public GetCommand(@NotNull BinaryRow keyRow) {
         assert keyRow != null;
 
         this.keyRow = keyRow;
 
-        rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
     }
 
     /**
-     * Gets a key row.
+     * Gets a binary key row to be got.
      *
-     * @return Key row.
+     * @return Binary key.
      */
     public BinaryRow getKeyRow() {
         if (keyRow == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertAllCommand.java
new file mode 100644
index 0000000..1dbad79
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command inserts a batch rows.
+ */
+public class InsertAllCommand implements WriteCommand {
+    /** Binary rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * Creates a new instance of InsertAllCommand with the given rows to be inserted.
+     * The {@code rows} should not be {@code null} or empty.
+     *
+     * @param rows Binary rows.
+     */
+    public InsertAllCommand(@NotNull Set<BinaryRow> rows) {
+        assert rows != null && !rows.isEmpty();
+
+        this.rows = rows;
+
+        CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * Gets a set of binary rows to be inserted.
+     *
+     * @return Binary rows.
+     */
+    public Set<BinaryRow> getRows() {
+        if (rows == null && rowsBytes != null) {
+            rows = new HashSet<>();
+
+            CommandUtils.readRows(rowsBytes, rows::add);
+        }
+
+        return rows;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
index be6ddcc..b12e467 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
@@ -17,12 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
  * The command inserts a row.
  */
 public class InsertCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
-    /** Row. */
+    /** Binary row. */
     private transient BinaryRow row;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
     /**
-     * @param row Row.
+     * Creates a new instance of InsertCommand with the given row to be inserted.
+     * The {@code row} should not be {@code null}.
+     *
+     * @param row Binary row.
      */
     public InsertCommand(@NotNull BinaryRow row) {
         assert row != null;
 
         this.row = row;
 
-        rowToBytes(row, bytes -> rowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
     }
 
     /**
-     * Gets a data row.
+     * Gets a binary row to be inserted.
      *
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getRow() {
         if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
index 03c83a2..298484f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
@@ -17,12 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
@@ -30,31 +26,31 @@ import org.jetbrains.annotations.NotNull;
  * The command replaces an old entry to a new one.
  */
 public class ReplaceCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(ReplaceCommand.class);
-
-    /** Row. */
+    /** Replacing binary row. */
     private transient BinaryRow row;
 
-    /** Old row. */
+    /** Replaced binary row. */
     private transient BinaryRow oldRow;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
     /**
      * Old row bytes.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] oldRowBytes;
 
     /**
-     * @param oldRow Old row.
-     * @param row Row.
+     * Creates a new instance of ReplaceCommand with the given two rows to be replaced each other.
+     * Both rows should not be {@code null}.
+     *
+     * @param oldRow Old Binary row.
+     * @param row Binary row.
      */
     public ReplaceCommand(@NotNull BinaryRow oldRow, @NotNull BinaryRow row) {
         assert oldRow != null;
@@ -63,35 +59,14 @@ public class ReplaceCommand implements WriteCommand {
         this.oldRow = oldRow;
         this.row = row;
 
-        rowToBytes(oldRow, bytes -> oldRowBytes = bytes);
-        rowToBytes(row, bytes -> rowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(oldRow, bytes -> oldRowBytes = bytes);
+        CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
     }
 
     /**
-     * Gets a data row.
+     * Gets a binary row which will be after replace.
      *
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getRow() {
         if (row == null)
@@ -101,9 +76,9 @@ public class ReplaceCommand implements WriteCommand {
     }
 
     /**
-     * Gets an old row.
+     * Gets a binary row which should be before replace.
      *
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getOldRow() {
         if (oldRow == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceIfExistCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceIfExistCommand.java
index be6ddcc..4f20ced 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceIfExistCommand.java
@@ -17,68 +17,43 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * The command inserts a row.
+ * The command replaces an old entry to a new one.
  */
-public class InsertCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
-    /** Row. */
+public class ReplaceIfExistCommand implements WriteCommand {
+    /** Binary row. */
     private transient BinaryRow row;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
     /**
-     * @param row Row.
+     * Creates a new instance of ReplaceIfExistCommand with the given row to be replaced.
+     * The {@code row} should not be {@code null}.
+     *
+     * @param row Binary row.
      */
-    public InsertCommand(@NotNull BinaryRow row) {
+    public ReplaceIfExistCommand(@NotNull BinaryRow row) {
         assert row != null;
 
         this.row = row;
 
-        rowToBytes(row, bytes -> rowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
     }
 
     /**
-     * Gets a data row.
+     * Gets a binary row to be replaced.
      *
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getRow() {
         if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java
new file mode 100644
index 0000000..4f6ed5f
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command puts a batch rows.
+ */
+public class UpsertAllCommand implements WriteCommand {
+    /** Binary rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * Creates a new instance of UpsertAllCommand with the given rows to be upserted.
+     * The {@code rows} should not be {@code null} or empty.
+     *
+     * @param rows Binary rows.
+     */
+    public UpsertAllCommand(@NotNull Set<BinaryRow> rows) {
+        assert rows != null && !rows.isEmpty();
+
+        this.rows = rows;
+
+        CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * Gets a set of binary rows to be upserted.
+     *
+     * @return Binary rows.
+     */
+    public Set<BinaryRow> getRows() {
+        if (rows == null && rowsBytes != null) {
+            rows = new HashSet<>();
+
+            CommandUtils.readRows(rowsBytes, rows::add);
+        }
+
+        return rows;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
index cc46aca..9f2e1ef 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
@@ -17,12 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
  * The command inserts or updates a value for the key specified.
  */
 public class UpsertCommand implements WriteCommand {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(UpsertCommand.class);
-
-    /** Row. */
+    /** Binary row. */
     private transient BinaryRow row;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
     /**
-     * @param row Row.
+     * Creates a new instance of UpsertCommand with the given row to be upserted.
+     * The {@code row} should not be {@code null}.
+     *
+     * @param row Binary row.
      */
     public UpsertCommand(@NotNull BinaryRow row) {
         assert row != null;
 
         this.row = row;
 
-        rowToBytes(row, bytes -> rowBytes = bytes);
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row Row.
-     * @param consumer Byte array consumer.
-     */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
-
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
     }
 
     /**
-     * Gets a data row.
+     * Gets a binary row to be upserted.
      *
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getRow() {
         if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
new file mode 100644
index 0000000..7c9d969
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.response;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+
+/**
+ * This class represents a response object that contains a collection {@link BinaryRow} from a batch operation.
+ * @see GetAllCommand
+ * @see DeleteAllCommand
+ * @see InsertAllCommand
+ * @see DeleteExactAllCommand
+ */
+public class MultiRowsResponse implements Serializable {
+    /** Binary rows. */
+    private transient Collection<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * Creates a new instance of MultiRowsResponse with the given collection of binary rows.
+     *
+     * @param rows Collection of binary rows.
+     */
+    public MultiRowsResponse(Collection<BinaryRow> rows) {
+        this.rows = rows;
+
+        CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * @return Binary rows.
+     */
+    public Collection<BinaryRow> getValues() {
+        if (rows == null && rowsBytes != null) {
+            rows = new HashSet<>();
+
+            CommandUtils.readRows(rowsBytes, rows::add);
+        }
+
+        return rows;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
similarity index 56%
rename from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java
rename to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
index 803cee1..4465fac 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
@@ -17,67 +17,46 @@
 
 package org.apache.ignite.internal.table.distributed.command.response;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
-import org.apache.ignite.lang.IgniteLogger;
 
 /**
- * It is a response object for handling a table get command.
+ * This class represents a response object message that contains a single {@link BinaryRow}.
+ * @see GetCommand
+ * @see GetAndDeleteCommand
+ * @see GetAndUpsertCommand
+ * @see GetAndReplaceCommand
  */
-public class KVGetResponse implements Serializable {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
-    /** Row. */
+public class SingleRowResponse implements Serializable {
+    /** Binary row. */
     private transient BinaryRow row;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
-    public KVGetResponse(BinaryRow row) {
-        this.row = row;
-
-        rowToBytes(row, bytes -> rowBytes = bytes);
-    }
-
     /**
-     * Writes a row to byte array.
+     * Creates a new instance of SingleRowResponse with the given binary row.
      *
-     * @param row Row.
-     * @param consumer Byte array consumer.
+     * @param row Binary row.
      */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        if (row == null) {
-            consumer.accept(null);
-
-            return;
-        }
-
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
+    public SingleRowResponse(BinaryRow row) {
+        this.row = row;
 
-            consumer.accept(null);
-        }
+        CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
     }
 
     /**
-     * @return Data row.
+     * @return Binary row.
      */
     public BinaryRow getValue() {
         if (row == null && rowBytes != null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 5f49636..34141e8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -19,15 +19,29 @@ package org.apache.ignite.internal.table.distributed.raft;
 
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertCommand;
 import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
-import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
 import org.apache.ignite.raft.client.ReadCommand;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
@@ -38,7 +52,11 @@ import org.jetbrains.annotations.NotNull;
  * Partition command handler.
  */
 public class PartitionListener implements RaftGroupListener {
-    /** Storage. */
+    /**
+     * Storage.
+     * This is a temporary solution, it will apply until persistence layer would not be implemented.
+     * TODO: IGNITE-14790.
+     */
     private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
 
     /** {@inheritDoc} */
@@ -46,9 +64,27 @@ public class PartitionListener implements RaftGroupListener {
         while (iterator.hasNext()) {
             CommandClosure<ReadCommand> clo = iterator.next();
 
-            assert clo.command() instanceof GetCommand;
+            if (clo.command() instanceof GetCommand) {
+                clo.result(new SingleRowResponse(storage.get(
+                    extractAndWrapKey(((GetCommand)clo.command()).getKeyRow())
+                )));
+            }
+            else if (clo.command() instanceof GetAllCommand) {
+                Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
+
+                assert keyRows != null && !keyRows.isEmpty();
 
-            clo.result(new KVGetResponse(storage.get(extractAndWrapKey(((GetCommand)clo.command()).getKeyRow()))));
+                final Set<BinaryRow> res = keyRows.stream()
+                    .map(this::extractAndWrapKey)
+                    .map(storage::get)
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
+
+                clo.result(new MultiRowsResponse(res));
+            }
+            else
+                assert false : "Command was not found [cmd=" + clo.command() + ']';
         }
     }
 
@@ -58,10 +94,11 @@ public class PartitionListener implements RaftGroupListener {
             CommandClosure<WriteCommand> clo = iterator.next();
 
             if (clo.command() instanceof InsertCommand) {
-                BinaryRow previous = storage.putIfAbsent(
-                    extractAndWrapKey(((InsertCommand)clo.command()).getRow()),
-                    ((InsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((InsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Insert command should have a value.";
+
+                BinaryRow previous = storage.putIfAbsent(extractAndWrapKey(row), row);
 
                 clo.result(previous == null);
             }
@@ -91,13 +128,140 @@ public class PartitionListener implements RaftGroupListener {
                     clo.result(false);
             }
             else if (clo.command() instanceof UpsertCommand) {
-                storage.put(
-                    extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
-                    ((UpsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((UpsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Upsert command should have a value.";
+
+                storage.put(extractAndWrapKey(row), row);
 
                 clo.result(null);
             }
+            else if (clo.command() instanceof InsertAllCommand) {
+                Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
+
+                assert rows != null && !rows.isEmpty();
+
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
+
+                clo.result(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof UpsertAllCommand) {
+                Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
+
+                assert rows != null && !rows.isEmpty();
+
+                rows.forEach(k -> storage.put(extractAndWrapKey(k), k));
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof DeleteAllCommand) {
+                Set<BinaryRow> rows = ((DeleteAllCommand)clo.command()).getRows();
+
+                assert rows != null && !rows.isEmpty();
+
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> {
+                        if (k.hasValue())
+                            return null;
+                        else
+                            return storage.remove(extractAndWrapKey(k));
+                    })
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
+
+                clo.result(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof DeleteExactCommand) {
+                BinaryRow row = ((DeleteExactCommand)clo.command()).getRow();
+
+                assert row != null;
+                assert row.hasValue();
+
+                final KeyWrapper key = extractAndWrapKey(row);
+                final BinaryRow old = storage.get(key);
+
+                if (old == null || !old.hasValue())
+                    clo.result(false);
+                else
+                    clo.result(equalValues(row, old) && storage.remove(key) != null);
+            }
+            else if (clo.command() instanceof DeleteExactAllCommand) {
+                Set<BinaryRow> rows = ((DeleteExactAllCommand)clo.command()).getRows();
+
+                assert rows != null && !rows.isEmpty();
+
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> {
+                        final KeyWrapper key = extractAndWrapKey(k);
+                        final BinaryRow old = storage.get(key);
+
+                        if (old == null || !old.hasValue() || !equalValues(k, old))
+                            return null;
+
+                        return storage.remove(key);
+                    })
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
+
+                clo.result(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof ReplaceIfExistCommand) {
+                BinaryRow row = ((ReplaceIfExistCommand)clo.command()).getRow();
+
+                assert row != null;
+
+                final KeyWrapper key = extractAndWrapKey(row);
+                final BinaryRow oldRow = storage.get(key);
+
+                if (oldRow == null || !oldRow.hasValue())
+                    clo.result(false);
+                else
+                    clo.result(storage.put(key, row) == oldRow);
+            }
+            else if (clo.command() instanceof GetAndDeleteCommand) {
+                BinaryRow row = ((GetAndDeleteCommand)clo.command()).getKeyRow();
+
+                assert row != null;
+
+                BinaryRow oldRow = storage.remove(extractAndWrapKey(row));
+
+                if (oldRow == null || !oldRow.hasValue())
+                    clo.result(new SingleRowResponse(null));
+                else
+                    clo.result(new SingleRowResponse(oldRow));
+            }
+            else if (clo.command() instanceof GetAndReplaceCommand) {
+                BinaryRow row = ((GetAndReplaceCommand)clo.command()).getRow();
+
+                assert row != null && row.hasValue();
+
+                BinaryRow oldRow = storage.get(extractAndWrapKey(row));
+
+                storage.computeIfPresent(extractAndWrapKey(row), (key, val) -> row);
+
+                if (oldRow == null || !oldRow.hasValue())
+                    clo.result(new SingleRowResponse(null));
+                else
+                    clo.result(new SingleRowResponse(oldRow));
+            }
+            else if (clo.command() instanceof GetAndUpsertCommand) {
+                BinaryRow row = ((GetAndUpsertCommand)clo.command()).getKeyRow();
+
+                assert row != null && row.hasValue();
+
+                BinaryRow oldRow = storage.put(extractAndWrapKey(row), row);
+
+                if (oldRow == null || !oldRow.hasValue())
+                    clo.result(new SingleRowResponse(null));
+                else
+                    clo.result(new SingleRowResponse(oldRow));
+            }
             else
                 assert false : "Command was not found [cmd=" + clo.command() + ']';
         }
@@ -115,28 +279,6 @@ public class PartitionListener implements RaftGroupListener {
     }
 
     /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
-
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
-    }
-
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
-
-        return new KeyWrapper(bytes, row.hash());
-    }
-
-    /**
      * Wrapper provides correct byte[] comparison.
      */
     private static class KeyWrapper {
@@ -175,4 +317,37 @@ public class PartitionListener implements RaftGroupListener {
             return hash;
         }
     }
+
+    /**
+     * Compares two rows.
+     *
+     * @param row Row to compare.
+     * @param row2 Row to compare.
+     * @return True if these rows is equivalent, false otherwise.
+     */
+    private boolean equalValues(BinaryRow row, BinaryRow row2) {
+        if (row == row2)
+            return true;
+
+        if (row == null || row2 == null)
+            return false;
+
+        if (row.hasValue() ^ row2.hasValue())
+            return false;
+
+        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
+    }
+
+    /**
+     * Makes a wrapped key from a table row.
+     *
+     * @param row Row.
+     * @return Extracted key.
+     */
+    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
+        final byte[] bytes = new byte[row.keySlice().capacity()];
+        row.keySlice().get(bytes);
+
+        return new KeyWrapper(bytes, row.hash());
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ce67913..8b5213c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -17,18 +17,32 @@
 
 package org.apache.ignite.internal.table.distributed.storage;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertCommand;
 import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
-import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.jetbrains.annotations.NotNull;
 
@@ -71,13 +85,35 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(partId(keyRow)).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(partId(keyRow)).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(SingleRowResponse::getValue);
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : keyRows) {
+            keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new GetAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        return CompletableFuture.allOf(futures)
+            .thenApply(response -> Arrays.stream(futures)
+                .map(CompletableFuture::join)
+                .map(MultiRowsResponse::getValues)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
@@ -87,12 +123,30 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : rows) {
+            keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<Void>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new UpsertAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        return CompletableFuture.allOf(futures);
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row) {
-        return null;
+        return partitionMap.get(partId(row)).<SingleRowResponse>run(new GetAndUpsertCommand(row))
+            .thenApply(SingleRowResponse::getValue);
     }
 
     /** {@inheritDoc} */
@@ -102,12 +156,34 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : rows) {
+            keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new InsertAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        return CompletableFuture.allOf(futures)
+            .thenApply(response -> Arrays.stream(futures)
+                .map(CompletableFuture::join)
+                .map(MultiRowsResponse::getValues)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Boolean> replace(BinaryRow row) {
-        return null;
+        return partitionMap.get(partId(row)).<Boolean>run(new ReplaceIfExistCommand(row));
     }
 
     /** {@inheritDoc} */
@@ -117,7 +193,8 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> getAndReplace(BinaryRow row) {
-        return null;
+        return partitionMap.get(partId(row)).<SingleRowResponse>run(new ReplaceIfExistCommand(row))
+            .thenApply(SingleRowResponse::getValue);
     }
 
     /** {@inheritDoc} */
@@ -127,22 +204,67 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Boolean> deleteExact(BinaryRow oldRow) {
-        return null;
+        return partitionMap.get(partId(oldRow)).<Boolean>run(new DeleteExactCommand(oldRow));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> getAndDelete(BinaryRow row) {
-        return null;
+        return partitionMap.get(partId(row)).<SingleRowResponse>run(new GetAndDeleteCommand(row))
+            .thenApply(SingleRowResponse::getValue);
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : rows) {
+            keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new DeleteAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        return CompletableFuture.allOf(futures)
+            .thenApply(response -> Arrays.stream(futures)
+                .map(CompletableFuture::join)
+                .map(MultiRowsResponse::getValues)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : rows) {
+            keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new DeleteExactAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        return CompletableFuture.allOf(futures)
+            .thenApply(response -> Arrays.stream(futures)
+                .map(CompletableFuture::join)
+                .map(MultiRowsResponse::getValues)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList()));
     }
 
     /**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
new file mode 100644
index 0000000..8621d20
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -0,0 +1,700 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ * All rows should be removed before returning form each test.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Initializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Inserts rows and checks them.
+     * All rows are removed before returning.
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndCheck(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndCheck(true);
+
+        delete(true);
+    }
+
+    /**
+     * Upserts rows and checks them.
+     * All rows are removed before returning.
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndCheck(false);
+
+        upsert();
+
+        readAndCheck(true);
+
+        delete(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * Adds rows, replaces and checks them.
+     * All rows are removed before returning.
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndCheck(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * The test checks PutIfExist command.
+     * All rows are removed before returning.
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndCheck(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndCheck(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     * The test checks GetAndReplace command.
+     * All rows are removed before returning.
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndCheck(false);
+
+        getAndUpsertValues(false);
+
+        readAndCheck(true);
+
+        getAndReplaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndCheck(true);
+
+        deleteExactAllValues(true);
+
+        readAndCheck(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     * The test checks a batch upsert command.
+     * All rows are removed before returning.
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * The test checks a batch insert command.
+     * All rows are removed before returning.
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndCheck(boolean existed) {
+        readAndCheck(existed, i -> i);
+    }
+
+    /**
+     * Reads rows from the listener and checks values as expected by a mapper.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     * @param keyValueMapper Mapper a key to the value which will be expected.
+     */
+    private void readAndCheck(boolean existed, Function<Integer, Integer> keyValueMapper) {
+        commandListener.onRead(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new GetCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                SingleRowResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertNotNull(resp.getValue());
+
+                    assertTrue(resp.getValue().hasValue());
+
+                    Row row = new Row(SCHEMA, resp.getValue());
+
+                    assertEquals(i, row.intValue(0));
+                    assertEquals(keyValueMapper.apply(i), row.intValue(1));
+                }
+                else
+                    assertNull(resp.getValue());
+
+                return null;
+            }).when(clo).result(any(SingleRowResponse.class));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insert(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new InsertCommand(getTestRow(i, i)));
+
+            doAnswer(mock -> {
+                assertEquals(!existed, mock.getArgument(0));
+
+                return null;
+            }).when(clo).result(!existed);
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteExactAllValues(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            HashSet rows = new HashSet(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new DeleteExactAllCommand(rows));
+
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void getAndReplaceValues(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new GetAndReplaceCommand(getTestRow(i, i + 1)));
+
+            doAnswer(invocation -> {
+                SingleRowResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    Row row = new Row(SCHEMA, resp.getValue());
+
+                    assertEquals(i, row.intValue(0));
+                    assertEquals(i, row.intValue(1));
+                }
+                else
+                    assertNull(resp.getValue());
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void getAndUpsertValues(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new GetAndUpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                SingleRowResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    Row row = new Row(SCHEMA, resp.getValue());
+
+                    assertEquals(i, row.intValue(0));
+                    assertEquals(i + 1, row.intValue(1));
+                }
+                else
+                    assertNull(resp.getValue());
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void getAndDeleteValues(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new GetAndDeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                SingleRowResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+
+                    Row row = new Row(SCHEMA, resp.getValue());
+
+                    assertEquals(i, row.intValue(0));
+                    assertEquals(i + 1, row.intValue(1));
+                }
+                else
+                    assertNull(resp.getValue());
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void putIfExistValues(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new ReplaceIfExistCommand(getTestRow(i, i + 1)));
+
+            doAnswer(invocation -> {
+                boolean result = invocation.getArgument(0);
+
+                assertEquals(existed, result);
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteExactValues(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteExactCommand(getTestRow(i, i + 1)));
+
+            doAnswer(invocation -> {
+                boolean result = invocation.getArgument(0);
+
+                assertEquals(existed, result);
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * Replaces rows.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void replaceValues(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new ReplaceCommand(getTestRow(i, i), getTestRow(i, i + 1)));
+
+            doAnswer(invocation -> {
+                assertTrue(invocation.getArgument(0) instanceof Boolean);
+
+                boolean result = invocation.getArgument(0);
+
+                assertEquals(existed, result);
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * Prepares a test row which contains only key field.
+     *
+     * @return Row.
+     */
+    @NotNull private Row getTestKey(int key) {
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+
+        rowBuilder.appendInt(key);
+
+        return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
+    }
+
+    /**
+     * Prepares a test row which contains key and value fields.
+     *
+     * @return Row.
+     */
+    @NotNull private Row getTestRow(int key, int val) {
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+
+        rowBuilder.appendInt(key);
+        rowBuilder.appendInt(val);
+
+        return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
+    }
+}