You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/11/24 07:50:31 UTC

[GitHub] [ignite-3] sanpwc commented on a change in pull request #400: IGNITE-15085

sanpwc commented on a change in pull request #400:
URL: https://github.com/apache/ignite-3/pull/400#discussion_r754846315



##########
File path: modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
##########
@@ -231,7 +231,7 @@
      * @param val Value to be associated with the specified key.
      * @return {@code True} if an old value was replaced, {@code false} otherwise.
      */
-    boolean replace(@NotNull K key, V val);
+    boolean replace(@NotNull K key, @NotNull V val);

Review comment:
       For this and other @NotNull same as above.

##########
File path: modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
##########
@@ -148,7 +148,7 @@
      * @param val Value to be associated with the specified key.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull K key, V val);
+    @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull K key, @NotNull V val);

Review comment:
       What will happen if val (or key) is null? Common solution is to throw NullPointerException (*), do we also stick to such approach? If true let's denote it within javadoc.
   * https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#put-K-V-

##########
File path: modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
##########
@@ -307,7 +317,7 @@ public void writeKvTuple(
             for (var i = 0; i < val.columnCount(); i++) {
                 var colName = val.columnName(i);
                 var col = schema.column(colName);
-    
+                

Review comment:
       damn, auto formatting))

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
##########
@@ -30,7 +33,8 @@
     /**
      * The callback to apply read commands.
      *
-     * <p>If the runtime exception is thrown during iteration all unprocessed read requests will be aborted with the STM exception.
+     * <p>If the runtime exception is thrown during iteration all unprocessed read requests will be

Review comment:
       Line
   break
   is
   not
   needed
   here
   because
   the
   line
   is
   less
   than
   140
   symbols
   .
   ))

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -238,4 +261,47 @@ public void stop() throws Exception {
     public void stopRaftGroup(String groupId) {
         raftServer.stopRaftGroup(groupId);
     }
+
+    /**
+     * Applies a command to a local raft group.
+     *
+     * @param groupId Group id.
+     * @param cmd The command.
+     * @return The future.
+     */
+    public CompletableFuture<?> apply(String groupId, Command cmd) {

Review comment:
       I believe that having apply() here breaks the encapsulation. Semantically apply is a method of either raftGroup or state machine. Loza shouldn't have such methods, especially with Command parameter.

##########
File path: modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
##########
@@ -148,8 +166,8 @@ public String raftGroupId() {
      * Check meta storage entry.
      *
      * @param metaStorage Meta storage service.
-     * @param expected    Expected entry.
-     * @throws ExecutionException   If failed.
+     * @param expected Expected entry.

Review comment:
       Padding is still not preferable one, not sure whether it's mandatory though.

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
##########
@@ -117,123 +121,135 @@ public JraftServerImpl(
         this.dataPath = dataPath;
         this.nodeManager = new NodeManager();
         this.opts = opts;
-
+        
         if (opts.getServerName() == null) {
             opts.setServerName(service.localConfiguration().getName());
         }
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}

Review comment:
       Here and everywhere - one-liner is fine.

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
##########
@@ -101,65 +114,93 @@ public void afterFollowerStop(RaftGroupService service) throws Exception {
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void afterSnapshot(RaftGroupService service) throws Exception {
+        TxManager txManager = new TxManagerImpl(clientService(), new HeapLockManager());
+
         var table = new InternalTableImpl(
                 "table",
                 new IgniteUuid(UUID.randomUUID(), 0),
                 Map.of(0, service),
                 1,
                 NetworkAddress::toString,
+                txManager,
                 mock(TableStorage.class)
         );
 
         table.upsert(SECOND_VALUE, null).get();
+
+        assertNotNull(table.get(SECOND_KEY, null).join());
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted, boolean interactedAfterSnapshot) {
-        PartitionStorage storage = getListener(restarted, raftGroupId()).getStorage();
+    public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted,
+            boolean interactedAfterSnapshot) {
+        VersionedRowStore storage = getListener(restarted, raftGroupId()).getStorage();
 
         Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
         Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
 
-        ByteBuffer buffer = key.keySlice();
-        byte[] keyBytes = new byte[buffer.capacity()];
-        buffer.get(keyBytes);
+        return () -> {
+            BinaryRow read = storage.get(key, null);
 
-        SimpleDataRow finalRow = new SimpleDataRow(keyBytes, null);
-        SimpleDataRow finalValue = new SimpleDataRow(keyBytes, value.bytes());
+            if (read == null) {
+                return false;
+            }
 
-        return () -> {
-            DataRow read = storage.read(finalRow);
-            return Objects.equals(finalValue, read);
+            return Arrays.equals(value.bytes(), read.bytes());
         };
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public Path getListenerPersistencePath(PartitionListener listener) {
         return paths.get(listener);
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public RaftGroupListener createListener(Path workDir) {
+    public RaftGroupListener createListener(ClusterService service, Path workDir) {
         return paths.entrySet().stream()
                 .filter(entry -> entry.getValue().equals(workDir))
                 .map(Map.Entry::getKey)
                 .findAny()
                 .orElseGet(() -> {
-                    PartitionListener listener = new PartitionListener(new ConcurrentHashMapPartitionStorage());
+                    JraftServerImpl srv = servers.stream()
+                            .filter(s -> s.clusterService().topologyService().localMember().equals(service.topologyService().localMember()))
+                            .findFirst().get();
+
+                    // We need raft manager instance to initialize transaction manager.
+                    Loza raftMgr = new Loza(srv);
+
+                    TableTxManagerImpl txManager = new TableTxManagerImpl(service,
+                            new HeapLockManager(), raftMgr);
+
+                    txManager.start(); // Init listener.

Review comment:
       And what about stop()?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
##########
@@ -278,17 +277,16 @@ public boolean replace(@NotNull K key, V val) {
      * {@inheritDoc}
      */
     @Override
-    public boolean replace(@NotNull K key, V oldVal, V newVal) {
+    public boolean replace(@NotNull K key, @NotNull V oldVal, @NotNull V newVal) {
         return sync(replaceAsync(key, oldVal, newVal));
     }
     
     /**
      * {@inheritDoc}
      */
     @Override
-    public @NotNull
-    CompletableFuture<Boolean> replaceAsync(@NotNull K key, V val) {
-        BinaryRow row = marshal(key, val);
+    public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V val) {

Review comment:
       seems that val should also be @NotNull

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
##########
@@ -43,277 +44,286 @@
  * Key-value view implementation for binary user-object representation.
  */
 public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValueView<Tuple, Tuple> {
-    /** Marshaller. */
+    /**
+     * Marshaller.
+     */
     private final TupleMarshallerImpl marsh;
-    
-    /** Table manager. */
+
+    /**
+     * Table manager.
+     */
     private final TableManager tblMgr;
-    
+
     /**
      * Constructor.
      *
-     * @param tbl       Table storage.

Review comment:
       Here and bellow: padding.

##########
File path: modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
##########
@@ -333,11 +343,12 @@ public void writeKvTuple(
      * @param schema Schema.
      * @param out Out.
      */
-    public void writeKvTuples(Map<Tuple, Tuple> pairs, ClientSchema schema, ClientMessagePacker out) {
+    public void writeKvTuples(Map<Tuple, Tuple> pairs, ClientSchema schema,

Review comment:
       Why? `public void writeKvTuples(Map<Tuple, Tuple> pairs, ClientSchema schema, ClientMessagePacker out) {` is less than 140 symbols.

##########
File path: modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
##########
@@ -19,24 +19,55 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Ignite Transactions facade.
  */
 public interface IgniteTransactions {
+    /**
+     * Returns a facade with new default timeout.
+     *
+     * @param timeout The timeout.

Review comment:
       Let's denote the time unit, currently it's not clear whether it's a second or millisecond or...

##########
File path: modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
##########
@@ -19,24 +19,55 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Ignite Transactions facade.
  */
 public interface IgniteTransactions {
+    /**
+     * Returns a facade with new default timeout.
+     *
+     * @param timeout The timeout.
+     *
+     * @return A facade using a new timeout.
+     */
+    IgniteTransactions withTimeout(long timeout);
+
     /**
      * Begins a transaction.
      *
-     * @return The future.
+     * @return The started transaction.
+     */
+    Transaction begin();
+
+    /**
+     * Begins an async transaction.
+     *
+     * @return The future holding the started transaction.
      */
     CompletableFuture<Transaction> beginAsync();
 
     /**
-     * Synchronously executes a closure within a transaction.
+     * Executes a closure within a transaction.
      *
-     * <p>If the closure is executed normally (no exceptions), the transaction is automatically committed.
+     * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
      *
      * @param clo The closure.
      */
     void runInTransaction(Consumer<Transaction> clo);

Review comment:
       Here and bellow, seems that we should denote all possible exceptions together with an explanation of one's reason within javadoc. For example, is it possible for TransactionException to be thrown here?

##########
File path: modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
##########
@@ -40,7 +41,8 @@
  * Persistent (rocksdb-based) meta storage raft group snapshots tests.
  */
 @ExtendWith(WorkDirectoryExtension.class)
-public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnapshotTest<MetaStorageListener> {
+public class ItMetaStorageServicePersistenceTest extends

Review comment:
       Here and elsewhere. Auto-formatting issue, please fix, there's nothing wrong with size of this line.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
##########
@@ -834,6 +834,30 @@ public static String identity(Object obj) {
             }
         }
     }
+    
+    /**
+     * Produces auto-generated output of string presentation for given object.
+     *
+     * @param obj Object to get a string presentation for.
+     * @return String presentation of the given object.
+     */
+    public static String toString(Object obj) {
+        assert obj != null;

Review comment:
       Why we need such assert here. Objects.toString() is fine with null values https://docs.oracle.com/javase/8/docs/api/java/util/Objects.html#toString-java.lang.Object-

##########
File path: modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
##########
@@ -85,13 +85,17 @@ public IgniteUuid tableId() {
         return id;
     }
     
-    /** {@inheritDoc} */
+    /**

Review comment:
       What about one-liner?

##########
File path: modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
##########
@@ -19,24 +19,55 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Ignite Transactions facade.
  */
 public interface IgniteTransactions {
+    /**
+     * Returns a facade with new default timeout.
+     *
+     * @param timeout The timeout.
+     *
+     * @return A facade using a new timeout.
+     */
+    IgniteTransactions withTimeout(long timeout);
+
     /**
      * Begins a transaction.
      *
-     * @return The future.
+     * @return The started transaction.
+     */
+    Transaction begin();
+
+    /**
+     * Begins an async transaction.
+     *
+     * @return The future holding the started transaction.
      */
     CompletableFuture<Transaction> beginAsync();
 
     /**
-     * Synchronously executes a closure within a transaction.
+     * Executes a closure within a transaction.
      *
-     * <p>If the closure is executed normally (no exceptions), the transaction is automatically committed.
+     * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
      *
      * @param clo The closure.
      */
     void runInTransaction(Consumer<Transaction> clo);
+
+    /**
+     * Executes a closure within a transaction and returns a result.
+     *
+     * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
+     *
+     * <p>This method will automatically enlist all tables into the transaction, but the execution of
+     * the transaction shouldn't leave starting thread or an exception will be thrown.
+     *
+     * @param clo The closure.
+     * @param <T> Closure result type.
+     * @return The result.
+     */
+    <T> T runInTransaction(Function<Transaction, T> clo);

Review comment:
       Same as above.

##########
File path: modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
##########
@@ -63,7 +65,9 @@ void tearDown() throws Exception {
         }
     }
 
-    /** {@inheritDoc} */

Review comment:
       Here and elsewhere. Auto-formatting issue, please fix, there's nothing wrong with one-liners.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertAllCommand.java
##########
@@ -17,51 +17,23 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Collection;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * The command inserts a batch rows.
  */
-public class InsertAllCommand implements WriteCommand {
-    /** Binary rows. */
-    private transient Set<BinaryRow> rows;
-
-    /*
-     * Row bytes.
-     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after (IGNITE-14793).
-     */
-    private byte[] rowsBytes;
-
+public class InsertAllCommand extends MultiKeyCommand implements WriteCommand {
     /**
      * Creates a new instance of InsertAllCommand with the given rows to be inserted. The {@code rows} should not be {@code null} or empty.
      *
      * @param rows Binary rows.
+     * @param ts The timestamp.

Review comment:
       wow, wow, it was `timestamp` elsewhere above, why it's not `ts`?

##########
File path: modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
##########
@@ -28,248 +28,314 @@
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Fake internal table.
  */
 public class FakeInternalTable implements InternalTable {
-    /** Table name. */
+    /**
+     * Table name.
+     */
     private final String tableName;

Review comment:
       What's wrong with one-liner here and bellow?

##########
File path: modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
##########
@@ -68,43 +68,36 @@
     /** Starting client port. */
     private static final int CLIENT_PORT = 6003;
 
-    /**
-     * Peers list.
-     */
+    /** Peers list. */
     private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
             .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
             .map(Peer::new)
             .collect(Collectors.toUnmodifiableList());
 
-    /** Factory. */
     private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
 
-    /** Network factory. */
     private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
 
     private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
 
     @WorkDirectory
     private Path workDir;
 
-    /** Cluster. */
     private final List<ClusterService> cluster = new ArrayList<>();
 
-    /** Servers. */
-    private final List<JraftServerImpl> servers = new ArrayList<>();
+    protected final List<JraftServerImpl> servers = new ArrayList<>();
 
-    /** Clients. */
     private final List<RaftGroupService> clients = new ArrayList<>();
 
-    /** Executor for raft group services. */
     private ScheduledExecutorService executor;
 
     /**
      * Create executor for raft group services.
      */
     @BeforeEach
     public void beforeTest() {
-        executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME));
+        executor = new ScheduledThreadPoolExecutor(20,

Review comment:
       Here and bellow. Auto-formatting issues, e.g. unnecessary line breaks.

##########
File path: modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
##########
@@ -28,248 +28,314 @@
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Fake internal table.
  */
 public class FakeInternalTable implements InternalTable {
-    /** Table name. */
+    /**
+     * Table name.
+     */
     private final String tableName;
-    
-    /** Table ID. */
+
+    /**
+     * Table ID.
+     */
     private final IgniteUuid tableId;
-    
-    /** Table data. */
+
+    /**
+     * Table data.
+     */
     private final ConcurrentHashMap<ByteBuffer, BinaryRow> data = new ConcurrentHashMap<>();
-    
+
     /**
      * Constructor.
      *
      * @param tableName Name.
-     * @param tableId   Id.
+     * @param tableId Id.
      */
     public FakeInternalTable(String tableName, IgniteUuid tableId) {
         this.tableName = tableName;
         this.tableId = tableId;
     }
-    
-    /** {@inheritDoc} */
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public @NotNull TableStorage storage() {
         throw new UnsupportedOperationException("Not implemented yet");
     }
-    
-    /** {@inheritDoc} */
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int partitions() {
         return 1;
     }
-    
-    /** {@inheritDoc} */
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public @NotNull IgniteUuid tableId() {
         return tableId;
     }
-    
-    /** {@inheritDoc} */
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public @NotNull String tableName() {
         return tableName;
     }
     
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<BinaryRow> get(BinaryRow keyRow, @Nullable Transaction tx) {
+    public CompletableFuture<BinaryRow> get(BinaryRow keyRow, @Nullable InternalTransaction tx) {
         return CompletableFuture.completedFuture(data.get(keyRow.keySlice()));
     }
-    
-    /** {@inheritDoc} */
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows, @Nullable Transaction tx) {
+    public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows,
+            @Nullable InternalTransaction tx) {
         var res = new ArrayList<BinaryRow>();
-        
+
         for (var key : keyRows) {
             var val = get(key, null);
-    
+
             if (val != null) {
                 res.add(val.getNow(null));
             }
         }
-        
+
         return CompletableFuture.completedFuture(res);
     }
-    
-    /** {@inheritDoc} */
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public CompletableFuture<Void> upsert(BinaryRow row, @Nullable Transaction tx) {
+    public CompletableFuture<Void> upsert(BinaryRow row, @Nullable InternalTransaction tx) {
         data.put(row.keySlice(), row);
-        
+
         return CompletableFuture.completedFuture(null);
     }
-    
-    /** {@inheritDoc} */
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows, @Nullable Transaction tx) {
+    public CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows,

Review comment:
       Here and bellow. Seems that we don't need such formatting here, row is less than 140 s.

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
##########
@@ -60,4 +61,11 @@
      * @return Local peer or null if the group is not started.
      */
     @Nullable Peer localPeer(String groupId);
+
+    /**
+     * Returns a set of started partition groups.
+     *
+     * @return Started groups.
+     */
+    Set<String> startedGroups();

Review comment:
       @TestOnly

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -79,9 +83,10 @@
     private final ScheduledExecutorService executor;
 
     /**
-     * Constructor.
+     * The constructor.
      *
      * @param clusterNetSvc Cluster network service.
+     * @param dataPath Data path.

Review comment:
       Proper padding is missing.

##########
File path: modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
##########
@@ -279,11 +277,12 @@ public void testSnapshot(TestData testData, TestInfo testInfo) throws Exception
      * Creates a closure that will be executed periodically to check if the snapshot and (conditionally on the {@link
      * TestData#interactAfterSnapshot}) the raft log was successfully restored by the follower node.
      *
-     * @param restarted               Restarted follower node.

Review comment:
       Auto-formatting issue, padding is expected here.

##########
File path: modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
##########
@@ -326,12 +326,14 @@ protected T getListener(JraftServerImpl server, String grpId) {
      * Wait for topology.
      *
      * @param cluster The cluster.
-     * @param exp     Expected count.

Review comment:
       Here and elsewhere: auto-formatting issue, padding is expected here.

##########
File path: modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
##########
@@ -162,10 +157,13 @@ private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
             this.interactAfterSnapshot = interactAfterSnapshot;
         }
 
-        /** {@inheritDoc} */
+        /**
+         * {@inheritDoc}

Review comment:
       One-liner is finer here.

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
##########
@@ -231,7 +231,7 @@
     EIO(1014),
 
     /**
-     * Invalid value.
+     * Invalid value

Review comment:
       Why?

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
##########
@@ -39,9 +43,9 @@
     /**
      * The callback to apply write commands.
      *
-     * <p>If the runtime exception is thrown during iteration, all entries starting from current iteration are considered unapplied, the

Review comment:
       Same as above. Probably you have incorrect idea settings.

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
##########
@@ -90,23 +91,26 @@
 
     /** Request executor. */
     private ExecutorService requestExecutor;
-
+    
     /**
      * Constructor.
      *
      * @param service  Cluster service.
      * @param dataPath Data path.
      */
-    public JraftServerImpl(ClusterService service, Path dataPath) {
+    public JraftServerImpl(
+            ClusterService service,
+            Path dataPath
+    ) {
         this(service, dataPath, new NodeOptions());
     }
-
+    
     /**
      * Constructor.
      *
      * @param service  Cluster service.
      * @param dataPath Data path.
-     * @param opts     Default node options.
+     * @param opts Default node options.

Review comment:
       Here and everywhere - padding.

##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
##########
@@ -90,23 +91,26 @@
 
     /** Request executor. */
     private ExecutorService requestExecutor;
-
+    

Review comment:
       Here and everywhere - formatting.

##########
File path: modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
##########
@@ -195,7 +195,7 @@ private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
         var addr = new NetworkAddress(getLocalAddress(), PORT);
 
         ClusterService service = clusterService(PORT + idx, List.of(addr), true);
-
+    

Review comment:
        Nope

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
##########
@@ -76,13 +85,16 @@ public void beforeFollowerStop(RaftGroupService service) throws Exception {
                 Map.of(0, service),
                 1,
                 NetworkAddress::toString,
+                new TxManagerImpl(clientService(), new HeapLockManager()),
                 mock(TableStorage.class)
         );
 
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}

Review comment:
       Formatting.

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
##########
@@ -51,177 +52,206 @@
  * Storage implementation based on {@link ConcurrentHashMap}.
  */
 public class ConcurrentHashMapPartitionStorage implements PartitionStorage {
-    /** Name of the snapshot file. */

Review comment:
       Here and bellow - formatting.

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
##########
@@ -51,177 +52,206 @@
  * Storage implementation based on {@link ConcurrentHashMap}.
  */
 public class ConcurrentHashMapPartitionStorage implements PartitionStorage {
-    /** Name of the snapshot file. */
+    /**
+     * Name of the snapshot file.
+     */
     private static final String SNAPSHOT_FILE = "snapshot_file";
-
-    /** Storage content. */
-    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
-
-    /** {@inheritDoc} */
+    
+    /**
+     * Storage content.
+     */
+    private final ConcurrentSkipListMap<ByteArray, byte[]> map = new ConcurrentSkipListMap<>();
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int partitionId() {
         return 0;
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     @Nullable
     public DataRow read(SearchRow key) throws StorageException {
         byte[] keyBytes = key.keyBytes();
-
+        
         byte[] valueBytes = map.get(new ByteArray(keyBytes));
-
+        
         return valueBytes == null ? null : new SimpleDataRow(keyBytes, valueBytes);
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public Collection<DataRow> readAll(List<? extends SearchRow> keys) {
         return keys.stream()
                 .map(this::read)
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void write(DataRow row) throws StorageException {
         map.put(new ByteArray(row.keyBytes()), row.valueBytes());
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void writeAll(List<? extends DataRow> rows) throws StorageException {
         rows.forEach(this::write);
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws StorageException {
         return rows.stream()
-                .map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)
+                .map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null
+                        ? null : row)
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void remove(SearchRow key) throws StorageException {
         map.remove(new ByteArray(key.keyBytes()));
     }
-
-    /** {@inheritDoc} */
+    
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) {
         var skippedRows = new ArrayList<SearchRow>(keys.size());
-
+        

Review comment:
       Here and bellow - formatting.

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
##########
@@ -76,13 +85,16 @@ public void beforeFollowerStop(RaftGroupService service) throws Exception {
                 Map.of(0, service),
                 1,
                 NetworkAddress::toString,
+                new TxManagerImpl(clientService(), new HeapLockManager()),

Review comment:
       Same as above.
   Where's txManager start() and stop()? Even if it's currently only registers messageHandlerls we should call start() and stop() properly, cause in other cases we'll definitely forget to do this on start method evolution.  

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
##########
@@ -91,6 +103,7 @@ public void afterFollowerStop(RaftGroupService service) throws Exception {
                 Map.of(0, service),
                 1,
                 NetworkAddress::toString,
+                new TxManagerImpl(clientService(), new HeapLockManager()),

Review comment:
       Same as above.

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
##########
@@ -182,7 +223,7 @@ private static Row createKeyRow(long id) {
     /**
      * Creates a {@link Row} with the supplied key and value.
      *
-     * @param id    Key.
+     * @param id Key.

Review comment:
       Formatting.

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
##########
@@ -144,13 +152,24 @@ public void setUp(TestInfo testInfo) throws Exception {
 
         raftSrv.start();
 
+        String grpName = "test_part_grp";
+
         List<Peer> conf = List.of(new Peer(nodeNetworkAddress));
 
         mockStorage = mock(PartitionStorage.class);
 
+        TxManager txManager = new TxManagerImpl(network, new HeapLockManager());

Review comment:
       Where's txManager start() and stop()? Even if it's currently only registers messageHandlerls we should call start() and stop() properly, cause in other cases we'll definitely forget to do this on start method evolution.  

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
##########
@@ -43,277 +44,286 @@
  * Key-value view implementation for binary user-object representation.
  */
 public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValueView<Tuple, Tuple> {
-    /** Marshaller. */
+    /**
+     * Marshaller.

Review comment:
       Here and bellow: formatting.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -318,13 +328,14 @@ private void onTableCreateInternal(@NotNull ConfigurationNotificationEvent<Table
                             toAdd.removeAll(oldPartitionAssignment);
 
                             InternalTable internalTable = tablesById.get(tblId).internalTable();
-
+                            
                             // Create new raft nodes according to new assignments.
                             futures[i] = raftMgr.updateRaftGroup(
                                     raftGroupName(tblId, partId),
                                     newPartitionAssignment,
                                     toAdd,
-                                    () -> new PartitionListener(internalTable.storage().getOrCreatePartition(partId))
+                                    () -> new PartitionListener(tblId,

Review comment:
       Formatting.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
##########
@@ -58,148 +59,166 @@
      * Asynchronously gets a row with same key columns values as given one from the table.
      *
      * @param keyRow Row with key columns set.
-     * @param tx     The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
+     * @throws LockException If a lock can't be acquired by some reason.
      */
-    CompletableFuture<BinaryRow> get(BinaryRow keyRow, @Nullable Transaction tx);
+    CompletableFuture<BinaryRow> get(BinaryRow keyRow, @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously get rows from the table.
      *
      * @param keyRows Rows with key columns set.
-     * @param tx      The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows, @Nullable Transaction tx);
+    CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows,
+            @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously inserts a row into the table if does not exist or replaces the existed one.
      *
      * @param row Row to insert into the table.
-     * @param tx  The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Void> upsert(BinaryRow row, @Nullable Transaction tx);
+    CompletableFuture<Void> upsert(BinaryRow row, @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously inserts a row into the table if does not exist or replaces the existed one.
      *
      * @param rows Rows to insert into the table.
-     * @param tx   The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows, @Nullable Transaction tx);
+    CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows, @Nullable InternalTransaction tx);
 
     /**
-     * Asynchronously inserts a row into the table or replaces if exists and return replaced previous row.
+     * Asynchronously inserts a row into the table or replaces if exists and return replaced
+     * previous row.
      *
      * @param row Row to insert into the table.
-     * @param tx  The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row, @Nullable Transaction tx);
+    CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row, @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously inserts a row into the table if not exists.
      *
      * @param row Row to insert into the table.
-     * @param tx  The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Boolean> insert(BinaryRow row, @Nullable Transaction tx);
+    CompletableFuture<Boolean> insert(BinaryRow row, @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously insert rows into the table which do not exist, skipping existed ones.
      *
      * @param rows Rows to insert into the table.
-     * @param tx   The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows, @Nullable Transaction tx);
+    CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows,
+            @Nullable InternalTransaction tx);
 
     /**
-     * Asynchronously replaces an existed row associated with the same key columns values as the given one has.
+     * Asynchronously replaces an existed row associated with the same key columns values as the
+     * given one has.
      *
      * @param row Row to replace with.
-     * @param tx  The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Boolean> replace(BinaryRow row, @Nullable Transaction tx);
+    CompletableFuture<Boolean> replace(BinaryRow row, @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously replaces an expected row in the table with the given new one.
      *
      * @param oldRow Row to replace.
      * @param newRow Row to replace with.
-     * @param tx     The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow, @Nullable Transaction tx);
+    CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow,
+            @Nullable InternalTransaction tx);
 
     /**
-     * Asynchronously gets an existed row associated with the same key columns values as the given one has, then replaces with the given
-     * one.
+     * Asynchronously gets an existed row associated with the same key columns values as the given
+     * one has, then replaces with the given one.
      *
      * @param row Row to replace with.
-     * @param tx  The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<BinaryRow> getAndReplace(BinaryRow row, @Nullable Transaction tx);
+    CompletableFuture<BinaryRow> getAndReplace(BinaryRow row, @Nullable InternalTransaction tx);
 
     /**
-     * Asynchronously deletes a row with the same key columns values as the given one from the table.
+     * Asynchronously deletes a row with the same key columns values as the given one from the
+     * table.
      *
      * @param keyRow Row with key columns set.
-     * @param tx     The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Boolean> delete(BinaryRow keyRow, @Nullable Transaction tx);
+    CompletableFuture<Boolean> delete(BinaryRow keyRow, @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously deletes given row from the table.
      *
      * @param oldRow Row to delete.
-     * @param tx     The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Boolean> deleteExact(BinaryRow oldRow, @Nullable Transaction tx);
+    CompletableFuture<Boolean> deleteExact(BinaryRow oldRow, @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously gets then deletes a row with the same key columns values from the table.
      *
      * @param row Row with key columns set.
-     * @param tx  The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<BinaryRow> getAndDelete(BinaryRow row, @Nullable Transaction tx);
+    CompletableFuture<BinaryRow> getAndDelete(BinaryRow row, @Nullable InternalTransaction tx);
 
     /**
-     * Asynchronously remove rows with the same key columns values as the given one has from the table.
+     * Asynchronously remove rows with the same key columns values as the given one has from the
+     * table.
      *
      * @param rows Rows with key columns set.
-     * @param tx   The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
-    CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows, @Nullable Transaction tx);
+    CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows,
+            @Nullable InternalTransaction tx);
 
     /**
      * Asynchronously remove given rows from the table.
      *
      * @param rows Rows to delete.
-     * @param tx   The transaction.
+     * @param tx The transaction.
      * @return Future representing pending completion of the operation.
      */
     CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows,
-            @Nullable Transaction tx);
+            @Nullable InternalTransaction tx);
+
+    /**
+     * Returns a partition for a key.
+     *
+     * @param keyRow The key.
+     * @return The partition.
+     */
+    int partition(BinaryRow keyRow);
 
     /**
-     * Scans given partition, providing {@link Publisher} that reactively notifies about partition rows.
+     * Scans given partition, providing {@link Publisher} that reactively notifies about partition
+     * rows.
      *
-     * @param p  The partition.
+     * @param p The partition.
      * @param tx The transaction.
      * @return {@link Publisher} that reactively notifies about partition rows.
      */
-    @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);
+    Publisher<BinaryRow> scan(int p, InternalTransaction tx);

Review comment:
       tx still @Nullable and result is @NotNull. 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -102,7 +104,9 @@
  */
 public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
         IgniteComponent {
-    /** The logger. */
+    /**
+     * The logger.

Review comment:
       Here and bellow: formatting.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
##########
@@ -23,14 +23,15 @@
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Internal table facade provides low-level methods for table operations. The facade hides TX/replication protocol over table storage
- * abstractions.
+ * Internal table facade provides low-level methods for table operations. The facade hides
+ * TX/replication protocol over table storage abstractions.

Review comment:
       Here and bellow: formatting.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
##########
@@ -100,4 +108,33 @@ public SchemaRegistry schemaView() {
     @Override public KeyValueView<Tuple, Tuple> keyValueView() {
         return new KeyValueBinaryViewImpl(tbl, schemaReg, tblMgr, null);
     }
+
+    /**
+     * Updates internal table raft group service for given partition.
+     *
+     * @param p Partition.
+     * @param raftGrpSvc Raft group service.
+     */
+    public void updateInternalTableRaftGroupService(int p, RaftGroupService raftGrpSvc) {

Review comment:
       Is never used, let's remove it. BTW why we don't have an inspection for that?

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
##########
@@ -101,65 +114,93 @@ public void afterFollowerStop(RaftGroupService service) throws Exception {
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void afterSnapshot(RaftGroupService service) throws Exception {
+        TxManager txManager = new TxManagerImpl(clientService(), new HeapLockManager());
+
         var table = new InternalTableImpl(
                 "table",
                 new IgniteUuid(UUID.randomUUID(), 0),
                 Map.of(0, service),
                 1,
                 NetworkAddress::toString,
+                txManager,
                 mock(TableStorage.class)
         );
 
         table.upsert(SECOND_VALUE, null).get();
+
+        assertNotNull(table.get(SECOND_KEY, null).join());
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted, boolean interactedAfterSnapshot) {
-        PartitionStorage storage = getListener(restarted, raftGroupId()).getStorage();
+    public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted,
+            boolean interactedAfterSnapshot) {
+        VersionedRowStore storage = getListener(restarted, raftGroupId()).getStorage();
 
         Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
         Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
 
-        ByteBuffer buffer = key.keySlice();
-        byte[] keyBytes = new byte[buffer.capacity()];
-        buffer.get(keyBytes);
+        return () -> {
+            BinaryRow read = storage.get(key, null);
 
-        SimpleDataRow finalRow = new SimpleDataRow(keyBytes, null);
-        SimpleDataRow finalValue = new SimpleDataRow(keyBytes, value.bytes());
+            if (read == null) {
+                return false;
+            }
 
-        return () -> {
-            DataRow read = storage.read(finalRow);
-            return Objects.equals(finalValue, read);
+            return Arrays.equals(value.bytes(), read.bytes());
         };
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public Path getListenerPersistencePath(PartitionListener listener) {
         return paths.get(listener);
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public RaftGroupListener createListener(Path workDir) {
+    public RaftGroupListener createListener(ClusterService service, Path workDir) {
         return paths.entrySet().stream()
                 .filter(entry -> entry.getValue().equals(workDir))
                 .map(Map.Entry::getKey)
                 .findAny()
                 .orElseGet(() -> {
-                    PartitionListener listener = new PartitionListener(new ConcurrentHashMapPartitionStorage());
+                    JraftServerImpl srv = servers.stream()
+                            .filter(s -> s.clusterService().topologyService().localMember().equals(service.topologyService().localMember()))
+                            .findFirst().get();
+
+                    // We need raft manager instance to initialize transaction manager.
+                    Loza raftMgr = new Loza(srv);

Review comment:
       For all Ignite components we should call start and stop within finally or afterTest, for example like it's implemented in org.apache.ignite.distributed.ItTxDistributedTestSingleNode#after

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -318,13 +328,14 @@ private void onTableCreateInternal(@NotNull ConfigurationNotificationEvent<Table
                             toAdd.removeAll(oldPartitionAssignment);
 
                             InternalTable internalTable = tablesById.get(tblId).internalTable();
-
+                            

Review comment:
       Formatting.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
##########
@@ -17,52 +17,24 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Collection;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * The command deletes entries by the passed keys.
  */
-public class DeleteAllCommand implements WriteCommand {
-    /** Binary rows. */
-    private transient Set<BinaryRow> rows;
-
-    /*
-     * Row bytes.
-     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after (IGNITE-14793).
-     */
-    private byte[] rowsBytes;
-
+public class DeleteAllCommand extends MultiKeyCommand implements WriteCommand {
     /**
      * Creates a new instance of DeleteAllCommand with the given set of keys to be deleted. The {@code keyRows} should not be {@code null}
      * or empty.
      *
      * @param keyRows Collection of binary row keys to be deleted.
+     * @param timestamp The timestamp.

Review comment:
       Here and bellow for other commands: could you please enrich the javadoc? What does timestamp means? I'd rather see an explanation that it's a sort tx binding where timestamp is unique monotonically increasing sort of tx id generated on begin(). 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
##########
@@ -58,148 +59,166 @@
      * Asynchronously gets a row with same key columns values as given one from the table.
      *
      * @param keyRow Row with key columns set.
-     * @param tx     The transaction.

Review comment:
       Here and bellow: padding.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
##########
@@ -17,52 +17,24 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Collection;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * The command deletes entries by the passed keys.
  */
-public class DeleteAllCommand implements WriteCommand {
-    /** Binary rows. */
-    private transient Set<BinaryRow> rows;
-
-    /*
-     * Row bytes.
-     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after (IGNITE-14793).
-     */
-    private byte[] rowsBytes;
-
+public class DeleteAllCommand extends MultiKeyCommand implements WriteCommand {

Review comment:
       Good idea with Multi/SingleKeyCommand!

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -148,25 +155,28 @@
     /**
      * Creates a new table manager.
      *
-     * @param tablesCfg          Tables configuration.
-     * @param dataStorageCfg     Data storage configuration.
-     * @param raftMgr            Raft manager.
-     * @param baselineMgr        Baseline manager.
+     * @param tablesCfg Tables configuration.

Review comment:
       Here and bellow: padding.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/** State machine command to finish a transaction. */
+public class FinishTxCommand implements WriteCommand {
+    /** The timestamp. */
+    private final Timestamp timestamp;
+
+    /** Commit or rollback state. */
+    private boolean finish;
+
+    /**
+     * The constructor.
+     *
+     * @param timestamp The timestamp.

Review comment:
       Params formatting.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -17,88 +17,104 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
-import java.nio.ByteBuffer;
+import static org.apache.ignite.lang.LoggerMessageHelper.format;
+
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.SearchRow;
 import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
-import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
-import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
-import org.apache.ignite.internal.storage.basic.InsertInvokeClosure;
-import org.apache.ignite.internal.storage.basic.ReplaceExactInvokeClosure;
 import org.apache.ignite.internal.storage.basic.SimpleDataRow;
 import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
 import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
 import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
 import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.MultiKeyCommand;
 import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
 import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.SingleKeyCommand;
+import org.apache.ignite.internal.table.distributed.command.TransactionalCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
 import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
 import org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
 import org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
 import org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.lang.LoggerMessageHelper;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.ReadCommand;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
 
 /**
  * Partition command handler.
  */
 public class PartitionListener implements RaftGroupListener {
-    /** Partition storage. */
-    private final PartitionStorage storage;
+    /** Lock id. */
+    private final IgniteUuid lockId;
+
+    /** The versioned storage. */
+    private final VersionedRowStore storage;
 
     /** Cursors map. */
     private final Map<IgniteUuid, CursorMeta> cursors;
 
+    /** Transaction manager. */
+    private final TxManager txManager;
+
     /**
-     * Constructor.
+     * The constructor.
      *
-     * @param partitionStorage Storage.
+     * @param lockId Lock id.

Review comment:
       Padding.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
##########
@@ -18,47 +18,21 @@
 package org.apache.ignite.internal.table.distributed.command;
 
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * The command inserts a row.
  */
-public class InsertCommand implements WriteCommand {
-    /** Binary row. */
-    private transient BinaryRow row;
-
-    /*
-     * Row bytes.
-     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after (IGNITE-14793).
-     */
-    private byte[] rowBytes;
-
+public class InsertCommand extends SingleKeyCommand implements WriteCommand {
     /**
      * Creates a new instance of InsertCommand with the given row to be inserted. The {@code row} should not be {@code null}.
      *
      * @param row Binary row.
+     * @param timestamp The timestamp.

Review comment:
       Glad that `timestamp`, I've missed it.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -544,6 +524,35 @@ public void onShutdown() {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> onBeforeApply(Command command) {
+        if (command instanceof SingleKeyCommand) {

Review comment:
       What if given command in non-transactional?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -559,67 +568,36 @@ private static DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
         return new SimpleDataRow(key, row.bytes());
     }
 
-    /**
-     * Adapter that converts a {@link BinaryRow} into a {@link SearchRow}.
-     */
-    private static class BinarySearchRow implements SearchRow {
-        /** Search key. */
-        private final byte[] keyBytes;
-
-        /** Source row. */
-        private final BinaryRow sourceRow;
-
-        /**
-         * Constructor.
-         *
-         * @param row Row to search for.
-         */
-        BinarySearchRow(BinaryRow row) {
-            sourceRow = row;
-            keyBytes = new byte[row.keySlice().capacity()];
-
-            row.keySlice().get(keyBytes);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public byte @NotNull [] keyBytes() {
-            return keyBytes;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public @NotNull ByteBuffer key() {
-            return ByteBuffer.wrap(keyBytes);
-        }
-    }
-
     /**
      * Returns underlying storage.
      */
     @TestOnly
-    public PartitionStorage getStorage() {
+    public VersionedRowStore getStorage() {
         return storage;
     }
 
     /**
      * Cursor meta information: origin node id and type.
      */
     private class CursorMeta {
-        /** Cursor. */
-        private final Cursor<DataRow> cursor;
+        /**
+         * Cursor.

Review comment:
       One-liners are still fine.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/SingleKeyCommand.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+
+/**
+ * A single key transactional command.
+ */
+public abstract class SingleKeyCommand implements TransactionalCommand, Serializable {
+    /** Binary key row. */
+    private transient BinaryRow keyRow;
+
+    /** The timestamp. */
+    private final Timestamp timestamp;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] keyRowBytes;
+
+    /**
+     * The constructor.
+     *
+     * @param keyRow The row.
+     * @param timestamp The timestamp.
+     */
+    public SingleKeyCommand(BinaryRow keyRow, Timestamp timestamp) {

Review comment:
       Same as for MultiKeyCommand, did you consider adding proper @Nullable, @NotNull annotations?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -144,25 +164,46 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
                 handleScanRetrieveBatchCommand((CommandClosure<ScanRetrieveBatchCommand>) clo);
             } else if (command instanceof ScanCloseCommand) {
                 handleScanCloseCommand((CommandClosure<ScanCloseCommand>) clo);
+            } else if (command instanceof FinishTxCommand) {
+                handleFinishTxCommand((CommandClosure<FinishTxCommand>) clo);
             } else {
                 assert false : "Command was not found [cmd=" + command + ']';
             }
         });
     }
 
+    /**
+     * Attempts to enlist a command into a transaction.
+     *
+     * @param command The command.
+     * @param clo The closure.
+     * @return {@code true} if a command is compatible with a transaction state or a command is not transactional.
+     */
+    private boolean tryEnlistIntoTransaction(Command command, CommandClosure<?> clo) {
+        if (command instanceof TransactionalCommand) {
+            Timestamp ts = ((TransactionalCommand) command).getTimestamp();
+
+            TxState state = txManager.getOrCreateTransaction(ts);
+
+            if (state != null && state != TxState.PENDING) {
+                clo.result(new TransactionException(format("Failed to enlist a key into a transaction, state={}", state)));
+
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     /**
      * Handler for the {@link GetCommand}.
      *
      * @param clo Command closure.
      */
     private void handleGetCommand(CommandClosure<GetCommand> clo) {
-        BinaryRow keyRow = clo.command().getKeyRow();
+        GetCommand cmd = (GetCommand) clo.command();

Review comment:
       Here and bellow: casting is redundant.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/MultiKeyCommand.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
+
+/**
+ * A multi key transactional command.
+ */
+public abstract class MultiKeyCommand implements TransactionalCommand, Serializable {
+    /** Binary rows. */
+    private transient Collection<BinaryRow> rows;
+
+    /** The timestamp. */
+    private Timestamp timestamp;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * The constructor.
+     *
+     * @param rows Rows.
+     * @param ts The timestamp.
+     */
+    public MultiKeyCommand(Collection<BinaryRow> rows, Timestamp ts) {
+        assert rows != null && !rows.isEmpty();

Review comment:
       In order to be consistent, let's add @NotNull annotation to rows and @Nullable to ts.

##########
File path: modules/schema/pom.xml
##########
@@ -50,7 +50,7 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-metastorage</artifactId>
+            <artifactId>ignite-configuration</artifactId>

Review comment:
       I mean adding test scope.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TransactionalCommand.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import org.apache.ignite.internal.tx.Timestamp;
+
+/**
+ * A marker interface for a transactional command.
+ */
+public interface TransactionalCommand {
+    /**
+     * Returns a timestamp.
+     *
+     * @return The timestamp.
+     */
+    public Timestamp getTimestamp();

Review comment:
       Contract isn't clear. Is it possible for timestamp to be null? Did you consider adding @Nullable or @NotNull annotations?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -559,67 +568,36 @@ private static DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
         return new SimpleDataRow(key, row.bytes());
     }
 
-    /**
-     * Adapter that converts a {@link BinaryRow} into a {@link SearchRow}.
-     */
-    private static class BinarySearchRow implements SearchRow {
-        /** Search key. */
-        private final byte[] keyBytes;
-
-        /** Source row. */
-        private final BinaryRow sourceRow;
-
-        /**
-         * Constructor.
-         *
-         * @param row Row to search for.
-         */
-        BinarySearchRow(BinaryRow row) {
-            sourceRow = row;
-            keyBytes = new byte[row.keySlice().capacity()];
-
-            row.keySlice().get(keyBytes);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public byte @NotNull [] keyBytes() {
-            return keyBytes;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public @NotNull ByteBuffer key() {
-            return ByteBuffer.wrap(keyBytes);
-        }
-    }
-
     /**
      * Returns underlying storage.
      */
     @TestOnly
-    public PartitionStorage getStorage() {
+    public VersionedRowStore getStorage() {
         return storage;
     }
 
     /**
      * Cursor meta information: origin node id and type.
      */
     private class CursorMeta {
-        /** Cursor. */
-        private final Cursor<DataRow> cursor;
+        /**
+         * Cursor.
+         */
+        private final Cursor<BinaryRow> cursor;
 
-        /** Id of the node that creates cursor. */
+        /**
+         * Id of the node that creates cursor.
+         */
         private final String requesterNodeId;
 
         /**
          * The constructor.
          *
-         * @param cursor          Cursor.
+         * @param cursor Cursor.

Review comment:
       Padding.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -144,25 +164,46 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
                 handleScanRetrieveBatchCommand((CommandClosure<ScanRetrieveBatchCommand>) clo);
             } else if (command instanceof ScanCloseCommand) {
                 handleScanCloseCommand((CommandClosure<ScanCloseCommand>) clo);
+            } else if (command instanceof FinishTxCommand) {
+                handleFinishTxCommand((CommandClosure<FinishTxCommand>) clo);
             } else {
                 assert false : "Command was not found [cmd=" + command + ']';
             }
         });
     }
 
+    /**
+     * Attempts to enlist a command into a transaction.
+     *
+     * @param command The command.

Review comment:
       Padding.

##########
File path: modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
##########
@@ -19,24 +19,55 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Ignite Transactions facade.
  */
 public interface IgniteTransactions {
+    /**
+     * Returns a facade with new default timeout.
+     *
+     * @param timeout The timeout.
+     *
+     * @return A facade using a new timeout.
+     */
+    IgniteTransactions withTimeout(long timeout);
+
     /**
      * Begins a transaction.
      *
-     * @return The future.
+     * @return The started transaction.
+     */
+    Transaction begin();
+
+    /**
+     * Begins an async transaction.
+     *
+     * @return The future holding the started transaction.
      */
     CompletableFuture<Transaction> beginAsync();
 
     /**
-     * Synchronously executes a closure within a transaction.
+     * Executes a closure within a transaction.
      *
-     * <p>If the closure is executed normally (no exceptions), the transaction is automatically committed.
+     * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
      *
      * @param clo The closure.
      */
     void runInTransaction(Consumer<Transaction> clo);

Review comment:
       Seems that `Consumer<Transaction> clo` or `Function<Transaction, T> clo` might contain non-transactional logic. What will happen in this case? In any case we should explicitly specify the behavior in javadoc. 

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
##########
@@ -226,4 +227,9 @@
      * Shutdown and cleanup resources for this instance.
      */
     void shutdown();
+
+    /**
+     * @return Cluster service.
+     */
+    ClusterService clusterService();

Review comment:
       Well, it's still not clear why to have clusterService in RaftGroupServce even with @TestOnly annotation? Seems that within tests you already have cluster service for any particular raftGroupService in hands.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
##########
@@ -17,52 +17,24 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Collection;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * The command deletes entries by the passed keys.
  */
-public class DeleteAllCommand implements WriteCommand {
-    /** Binary rows. */
-    private transient Set<BinaryRow> rows;
-
-    /*
-     * Row bytes.
-     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after (IGNITE-14793).
-     */
-    private byte[] rowsBytes;
-
+public class DeleteAllCommand extends MultiKeyCommand implements WriteCommand {
     /**
      * Creates a new instance of DeleteAllCommand with the given set of keys to be deleted. The {@code keyRows} should not be {@code null}
      * or empty.
      *
      * @param keyRows Collection of binary row keys to be deleted.
+     * @param timestamp The timestamp.

Review comment:
       Here and bellow for other commands: param formatting.

##########
File path: modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
##########
@@ -190,10 +191,11 @@ public static void writeTuple(
      *
      * @param packer Packer.
      * @param tuples Tuples.
+     * @param schemaRegistry The registry.
      * @throws IgniteException on failed serialization.
      */
-    public static void writeTuples(ClientMessagePacker packer, Collection<Tuple> tuples) {
-        writeTuples(packer, tuples, TuplePart.KEY_AND_VAL);
+    public static void writeTuples(ClientMessagePacker packer, Collection<Tuple> tuples, SchemaRegistry schemaRegistry) {

Review comment:
       Could you please answer this?

##########
File path: modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
##########
@@ -19,24 +19,55 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Ignite Transactions facade.
  */
 public interface IgniteTransactions {

Review comment:
       Could you please answer this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org