You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2024/02/23 16:56:03 UTC

(ignite-3) branch main updated: IGNITE-21600 Make partition-operations a plain executor (#3281)

This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4484f549d6 IGNITE-21600 Make partition-operations a plain executor (#3281)
4484f549d6 is described below

commit 4484f549d677b20fa8e6f677555f2f8429c725aa
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Fri Feb 23 20:55:58 2024 +0400

    IGNITE-21600 Make partition-operations a plain executor (#3281)
---
 .../ItPlacementDriverReplicaSideTest.java          | 16 +++---
 .../ignite/internal/replicator/ReplicaManager.java | 18 ++-----
 .../ignite/internal/replicator/ReplicaService.java | 14 ++----
 .../internal/replicator/ReplicaManagerTest.java    | 16 +++---
 .../ignite/internal/app/ThreadPoolsManager.java    | 20 +++++---
 .../ignite/distributed/ReplicaUnavailableTest.java | 16 +++---
 .../internal/table/distributed/TableManager.java   | 37 ++++++--------
 .../table/distributed/TableManagerTest.java        | 16 +++---
 .../apache/ignite/distributed/ItTxTestCluster.java | 16 +++---
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 58 +++-------------------
 10 files changed, 88 insertions(+), 139 deletions(-)

diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 2e46b2749c..e01245edb5 100644
--- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -40,6 +40,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -73,7 +76,6 @@ import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.TestReplicaMessagesFactory;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.topology.LogicalTopologyServiceTestImpl;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -117,18 +119,18 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest {
     private final Map<String, Loza> raftManagers = new HashMap<>();
     private final Map<String, TopologyAwareRaftGroupServiceFactory> raftClientFactory = new HashMap<>();
 
-    private StripedThreadPoolExecutor partitionOperationsExecutor;
+    private ExecutorService partitionOperationsExecutor;
 
     /** List of services to have to close before the test will be completed. */
     private final List<Closeable> servicesToClose = new ArrayList<>();
 
     @BeforeEach
     public void beforeTest(TestInfo testInfo) {
-        partitionOperationsExecutor = new StripedThreadPoolExecutor(
-                20,
-                NamedThreadFactory.create("test", "partition-operations", log),
-                false,
-                0
+        partitionOperationsExecutor = new ThreadPoolExecutor(
+                0, 20,
+                0, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create("test", "partition-operations", log)
         );
 
         placementDriverNodeNames = IntStream.range(BASE_PORT, BASE_PORT + 3).mapToObj(port -> testNodeName(testInfo, port))
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 3ec6fa1a05..d193cad4d5 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -72,9 +72,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
-import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
@@ -135,11 +133,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
     /** Scheduled executor for idle safe time sync. */
     private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor;
 
-    /**
-     * Chooses a stripe using {@link ReplicationGroupStripes#stripeFor(ReplicationGroupId, StripedThreadPoolExecutor)}
-     * so that requests concerning the same {@link ReplicationGroupId} are executed on the same thread.
-     */
-    private final ExecutorChooser<ReplicationGroupId> requestStripeChooser;
+    private final Executor requestsExecutor;
 
     /** Set of message groups to handler as replica requests. */
     private final Set<Class<?>> messageGroupsToHandle;
@@ -168,7 +162,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
             HybridClock clock,
             Set<Class<?>> messageGroupsToHandle,
             PlacementDriver placementDriver,
-            StripedThreadPoolExecutor requestsExecutor
+            Executor requestsExecutor
     ) {
         this(
                 nodeName,
@@ -201,7 +195,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
             HybridClock clock,
             Set<Class<?>> messageGroupsToHandle,
             PlacementDriver placementDriver,
-            StripedThreadPoolExecutor requestsExecutor,
+            Executor requestsExecutor,
             LongSupplier idleSafeTimePropagationPeriodMsSupplier
     ) {
         this.clusterNetSvc = clusterNetSvc;
@@ -211,10 +205,9 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
         this.handler = this::onReplicaMessageReceived;
         this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived;
         this.placementDriver = placementDriver;
+        this.requestsExecutor = requestsExecutor;
         this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier;
 
-        requestStripeChooser = new ChooseExecutorForReplicationGroup(requestsExecutor);
-
         scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
                 1,
                 NamedThreadFactory.create(nodeName, "scheduled-idle-safe-time-sync-thread", LOG)
@@ -241,8 +234,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
 
         ReplicaRequest request = (ReplicaRequest) message;
 
-        Executor stripeExecutor = requestStripeChooser.choose(request.groupId());
-        stripeExecutor.execute(() -> handleReplicaRequest(request, senderConsistentId, correlationId));
+        requestsExecutor.execute(() -> handleReplicaRequest(request, senderConsistentId, correlationId));
     }
 
     private void handleReplicaRequest(ReplicaRequest request, String senderConsistentId, @Nullable Long correlationId) {
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index b1889fba29..a4b981cfaf 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -42,7 +43,6 @@ import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.network.ClusterNode;
 
 /** The service is intended to execute requests on replicas. */
@@ -88,7 +88,7 @@ public class ReplicaService {
             MessagingService messagingService,
             HybridClock clock,
             String localConsistentId,
-            StripedThreadPoolExecutor executor
+            Executor executor
     ) {
         this(
                 wrapMessagingService(messagingService, localConsistentId, executor),
@@ -96,17 +96,11 @@ public class ReplicaService {
         );
     }
 
-    private static JumpToExecutorByConsistentIdAfterSend wrapMessagingService(
-            MessagingService messagingService,
-            String localConsistentId,
-            StripedThreadPoolExecutor executor
-    ) {
-        ChooseExecutorForReplicationGroup chooserByGroupId = new ChooseExecutorForReplicationGroup(executor);
-
+    private static MessagingService wrapMessagingService(MessagingService messagingService, String localConsistentId, Executor executor) {
         return new JumpToExecutorByConsistentIdAfterSend(
                 messagingService,
                 localConsistentId,
-                request -> chooserByGroupId.choose(((ReplicaRequest) request).groupId())
+                request -> executor
         );
     }
 
diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 6cb73347b6..90c1dd9a26 100644
--- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -35,6 +35,9 @@ import static org.mockito.Mockito.when;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.event.EventListener;
@@ -48,7 +51,6 @@ import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
@@ -66,7 +68,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
  */
 @ExtendWith(MockitoExtension.class)
 public class ReplicaManagerTest extends BaseIgniteAbstractTest {
-    private StripedThreadPoolExecutor requestsExecutor;
+    private ExecutorService requestsExecutor;
 
     private ReplicaManager replicaManager;
 
@@ -90,11 +92,11 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest {
 
         var clock = new HybridClockImpl();
 
-        requestsExecutor = new StripedThreadPoolExecutor(
-                5,
-                NamedThreadFactory.create(nodeName, "partition-operations", log),
-                false,
-                0
+        requestsExecutor = new ThreadPoolExecutor(
+                0, 5,
+                0, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create(nodeName, "partition-operations", log)
         );
 
         replicaManager = new ReplicaManager(nodeName, clusterService, cmgManager, clock, Set.of(), placementDriver, requestsExecutor);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index 1442302f9c..a3f2aa27e9 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.util.IgniteUtils;
 
 /**
@@ -49,7 +48,10 @@ public class ThreadPoolsManager implements IgniteComponent {
      */
     private final ExecutorService tableIoExecutor;
 
-    private final StripedThreadPoolExecutor partitionOperationsExecutor;
+    /**
+     * Executor on which partition operations are executed. Might do storage reads and writes (so it's expected to execute disk I/O).
+     */
+    private final ExecutorService partitionOperationsExecutor;
 
     private final ScheduledExecutorService commonScheduler;
 
@@ -67,11 +69,13 @@ public class ThreadPoolsManager implements IgniteComponent {
                 new LinkedBlockingQueue<>(),
                 IgniteThreadFactory.create(nodeName, "tableManager-io", LOG, STORAGE_READ, STORAGE_WRITE));
 
-        partitionOperationsExecutor = new StripedThreadPoolExecutor(
-                Math.min(cpus * 3, 25),
-                IgniteThreadFactory.create(nodeName, "partition-operations", LOG, STORAGE_READ, STORAGE_WRITE),
-                false,
-                0
+        int partitionsOperationsThreads = Math.min(cpus * 3, 25);
+        partitionOperationsExecutor = new ThreadPoolExecutor(
+                partitionsOperationsThreads,
+                partitionsOperationsThreads,
+                0, SECONDS,
+                new LinkedBlockingQueue<>(),
+                IgniteThreadFactory.create(nodeName, "partition-operations", LOG, STORAGE_READ, STORAGE_WRITE)
         );
 
         commonScheduler = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, "common-scheduler", LOG));
@@ -100,7 +104,7 @@ public class ThreadPoolsManager implements IgniteComponent {
     /**
      * Returns the executor of partition operations.
      */
-    public StripedThreadPoolExecutor partitionOperationsExecutor() {
+    public ExecutorService partitionOperationsExecutor() {
         return partitionOperationsExecutor;
     }
 
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 7595ba6d11..42d923b145 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -40,6 +40,9 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -71,7 +74,6 @@ import org.apache.ignite.internal.table.distributed.replication.request.ReadWrit
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
 import org.apache.ignite.internal.type.NativeTypes;
@@ -110,7 +112,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
 
     private ClusterService clusterService;
 
-    private StripedThreadPoolExecutor requestsExecutor;
+    private ExecutorService requestsExecutor;
 
     @BeforeEach
     public void setup() {
@@ -125,11 +127,11 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
         // This test is run without Meta storage.
         when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());
 
-        requestsExecutor = new StripedThreadPoolExecutor(
-                5,
-                NamedThreadFactory.create(NODE_NAME, "partition-operations", log),
-                false,
-                0
+        requestsExecutor = new ThreadPoolExecutor(
+                0, 5,
+                0, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create(NODE_NAME, "partition-operations", log)
         );
 
         replicaService = new ReplicaService(clusterService.messagingService(), clock);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index af81a79c00..b6957a5119 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -136,10 +136,8 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
-import org.apache.ignite.internal.replicator.ChooseExecutorForReplicationGroup;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
@@ -178,10 +176,8 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
 import org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
 import org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
-import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.LockManager;
@@ -372,9 +368,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
     private final StorageUpdateConfiguration storageUpdateConfig;
 
     /**
-     * Chooses a stripe to execute partition operations (that might cause I/O and/or be blocked on locks).
+     * Executes partition operations (that might cause I/O and/or be blocked on locks).
      */
-    private final ExecutorChooser<ReplicationGroupId> partitionOperationsStripeChooser;
+    private final Executor partitionOperationsExecutor;
 
     /** Marshallers provider. */
     private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider();
@@ -425,7 +421,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
             SchemaManager schemaManager,
             LogStorageFactoryCreator volatileLogStorageFactoryCreator,
             ExecutorService ioExecutor,
-            StripedThreadPoolExecutor partitionOperationsExecutor,
+            Executor partitionOperationsExecutor,
             HybridClock clock,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
@@ -449,19 +445,18 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         this.schemaManager = schemaManager;
         this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator;
         this.ioExecutor = ioExecutor;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
         this.clock = clock;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
         this.raftGroupServiceFactory = raftGroupServiceFactory;
         this.distributionZoneManager = distributionZoneManager;
-        this.schemaSyncService = schemaSyncService;
+        this.schemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor);
         this.catalogService = catalogService;
         this.observableTimestampTracker = observableTimestampTracker;
-        this.placementDriver = placementDriver;
+        this.placementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
         this.sql = sql;
         this.storageUpdateConfig = storageUpdateConfig;
 
-        partitionOperationsStripeChooser = new ChooseExecutorForReplicationGroup(partitionOperationsExecutor);
-
         TopologyService topologyService = clusterService.topologyService();
 
         TxMessageSender txMessageSender = new TxMessageSender(clusterService.messagingService(), replicaSvc, clock);
@@ -974,11 +969,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         int tableId = tablePartitionId.tableId();
         int partId = tablePartitionId.partitionId();
 
-        Executor partitionOperationsStripe = partitionOperationsStripeChooser.choose(tablePartitionId);
-
         return new PartitionReplicaListener(
                 mvPartitionStorage,
-                new ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsStripe),
+                new ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor),
                 txManager,
                 lockMgr,
                 scanRequestExecutor,
@@ -994,9 +987,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 partitionUpdateHandlers.storageUpdateHandler,
                 new CatalogValidationSchemasSource(catalogService, schemaManager),
                 localNode(),
-                new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsStripe),
+                schemaSyncService,
                 catalogService,
-                new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsStripe),
+                placementDriver,
                 clusterService.topologyService()
         );
     }
@@ -1470,7 +1463,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         HybridTimestamp now = clock.now();
 
         return orStopManagerFuture(schemaSyncService.waitForMetadataCompleteness(now))
-                .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> {
+                .thenCompose(unused -> inBusyLockAsync(busyLock, () -> {
                     int catalogVersion = catalogService.activeCatalogVersion(now.longValue());
 
                     Collection<CatalogTableDescriptor> tableDescriptors = catalogService.tables(catalogVersion);
@@ -1484,7 +1477,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                             .toArray(CompletableFuture[]::new);
 
                     return CompletableFutures.allOf(tableImplFutures);
-                }), ioExecutor);
+                }));
     }
 
     /**
@@ -1570,7 +1563,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
             HybridTimestamp now = clock.now();
 
             return orStopManagerFuture(schemaSyncService.waitForMetadataCompleteness(now))
-                    .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> {
+                    .thenCompose(unused -> inBusyLockAsync(busyLock, () -> {
                         int catalogVersion = catalogService.activeCatalogVersion(now.longValue());
 
                         // Check if the table has been deleted.
@@ -1579,7 +1572,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                         }
 
                         return tableAsyncInternalBusy(tableId);
-                    }), ioExecutor);
+                    }));
         });
     }
 
@@ -1622,7 +1615,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
             HybridTimestamp now = clock.now();
 
             return orStopManagerFuture(schemaSyncService.waitForMetadataCompleteness(now))
-                    .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> {
+                    .thenCompose(unused -> inBusyLockAsync(busyLock, () -> {
                         CatalogTableDescriptor tableDescriptor = catalogService.table(name, now.longValue());
 
                         // Check if the table has been deleted.
@@ -1631,7 +1624,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                         }
 
                         return tableAsyncInternalBusy(tableDescriptor.id());
-                    }), ioExecutor);
+                    }));
         });
     }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 493736fc0b..00a75d44e5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -62,8 +62,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
@@ -117,7 +120,6 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
 import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -236,7 +238,7 @@ public class TableManagerTest extends IgniteAbstractTest {
     /** Catalog manager. */
     private CatalogManager catalogManager;
 
-    private StripedThreadPoolExecutor partitionOperationsExecutor;
+    private ExecutorService partitionOperationsExecutor;
 
     @BeforeEach
     void before() throws NodeStoppingException {
@@ -266,11 +268,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         mockMetastore();
 
-        partitionOperationsExecutor = new StripedThreadPoolExecutor(
-                5,
-                IgniteThreadFactory.create("test", "partition-operations", log, STORAGE_READ, STORAGE_WRITE),
-                false,
-                0
+        partitionOperationsExecutor = new ThreadPoolExecutor(
+                0, 5,
+                0, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                IgniteThreadFactory.create("test", "partition-operations", log, STORAGE_READ, STORAGE_WRITE)
         );
     }
 
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 63ffe63804..36387a98c0 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -52,7 +52,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -124,7 +127,6 @@ import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -213,7 +215,7 @@ public class ItTxTestCluster {
 
     private ScheduledThreadPoolExecutor executor;
 
-    private StripedThreadPoolExecutor partitionOperationsExecutor;
+    private ExecutorService partitionOperationsExecutor;
 
     protected IgniteTransactions igniteTransactions;
 
@@ -332,11 +334,11 @@ public class ItTxTestCluster {
         executor = new ScheduledThreadPoolExecutor(20,
                 new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
 
-        partitionOperationsExecutor = new StripedThreadPoolExecutor(
-                20,
-                NamedThreadFactory.create("test", "partition-operations", LOG),
-                false,
-                0
+        partitionOperationsExecutor = new ThreadPoolExecutor(
+                0, 20,
+                0, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create("test", "partition-operations", LOG)
         );
 
         for (int i = 0; i < nodes; i++) {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 348d110397..d0c969d101 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -78,7 +78,6 @@ import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
-import org.apache.ignite.internal.replicator.ChooseExecutorForReplicationGroup;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -88,9 +87,7 @@ import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutExcepti
 import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
-import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LocalRwTxCounter;
@@ -207,47 +204,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
     /** Counter of read-write transactions that were created and completed locally on the node. */
     private final LocalRwTxCounter localRwTxCounter;
 
-    private final ExecutorChooser<ReplicationGroupId> partitionOperationsStripeChooser;
-
-    /**
-     * The constructor.
-     *
-     * @param txConfig Transaction configuration.
-     * @param clusterService Cluster service.
-     * @param replicaService Replica service.
-     * @param lockManager Lock manager.
-     * @param clock A hybrid logical clock.
-     * @param transactionIdGenerator Used to generate transaction IDs.
-     * @param placementDriver Placement driver.
-     * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms.
-     * @param localRwTxCounter Counter of read-write transactions that were created and completed locally on the node.
-     * @param partitionOperationsExecutor Executor on which partition operations will be executed, if needed.
-     */
-    public TxManagerImpl(
-            TransactionConfiguration txConfig,
-            ClusterService clusterService,
-            ReplicaService replicaService,
-            LockManager lockManager,
-            HybridClock clock,
-            TransactionIdGenerator transactionIdGenerator,
-            PlacementDriver placementDriver,
-            LongSupplier idleSafeTimePropagationPeriodMsSupplier,
-            LocalRwTxCounter localRwTxCounter,
-            StripedThreadPoolExecutor partitionOperationsExecutor
-    ) {
-        this(
-                txConfig,
-                clusterService,
-                replicaService,
-                lockManager,
-                clock,
-                transactionIdGenerator,
-                placementDriver,
-                idleSafeTimePropagationPeriodMsSupplier,
-                localRwTxCounter,
-                new ChooseExecutorForReplicationGroup(partitionOperationsExecutor)
-        );
-    }
+    private final Executor partitionOperationsExecutor;
 
     /**
      * Test-only constructor.
@@ -284,7 +241,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
                 placementDriver,
                 idleSafeTimePropagationPeriodMsSupplier,
                 localRwTxCounter,
-                groupId -> ForkJoinPool.commonPool()
+                ForkJoinPool.commonPool()
         );
     }
 
@@ -300,9 +257,9 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
      * @param placementDriver Placement driver.
      * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms.
      * @param localRwTxCounter Counter of read-write transactions that were created and completed locally on the node.
-     * @param partitionOperationsStripeChooser Chooser of an executor on which partition operations will be executed, if needed.
+     * @param partitionOperationsExecutor Executor on which partition operations will be executed, if needed.
      */
-    private TxManagerImpl(
+    public TxManagerImpl(
             TransactionConfiguration txConfig,
             ClusterService clusterService,
             ReplicaService replicaService,
@@ -312,7 +269,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
             PlacementDriver placementDriver,
             LongSupplier idleSafeTimePropagationPeriodMsSupplier,
             LocalRwTxCounter localRwTxCounter,
-            ExecutorChooser<ReplicationGroupId> partitionOperationsStripeChooser
+            Executor partitionOperationsExecutor
     ) {
         this.txConfig = txConfig;
         this.lockManager = lockManager;
@@ -324,7 +281,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
         this.messagingService = clusterService.messagingService();
         this.primaryReplicaEventListener = this::primaryReplicaEventListener;
         this.localRwTxCounter = localRwTxCounter;
-        this.partitionOperationsStripeChooser = partitionOperationsStripeChooser;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
 
         placementDriverHelper = new PlacementDriverHelper(placementDriver, clock);
 
@@ -666,7 +623,6 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
                         if (TransactionFailureHandler.isRecoverable(cause)) {
                             LOG.warn("Failed to finish Tx. The operation will be retried [txId={}].", ex, txId);
 
-                            Executor stripe = partitionOperationsStripeChooser.choose(commitPartition);
                             return supplyAsync(() -> durableFinish(
                                     observableTimestampTracker,
                                     commitPartition,
@@ -675,7 +631,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
                                     txId,
                                     commitTimestamp,
                                     txFinishFuture
-                            ), stripe).thenCompose(identity());
+                            ), partitionOperationsExecutor).thenCompose(identity());
                         } else {
                             LOG.warn("Failed to finish Tx [txId={}].", ex, txId);