You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2024/02/23 16:56:03 UTC
(ignite-3) branch main updated: IGNITE-21600 Make partition-operations a plain executor (#3281)
This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4484f549d6 IGNITE-21600 Make partition-operations a plain executor (#3281)
4484f549d6 is described below
commit 4484f549d677b20fa8e6f677555f2f8429c725aa
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Fri Feb 23 20:55:58 2024 +0400
IGNITE-21600 Make partition-operations a plain executor (#3281)
---
.../ItPlacementDriverReplicaSideTest.java | 16 +++---
.../ignite/internal/replicator/ReplicaManager.java | 18 ++-----
.../ignite/internal/replicator/ReplicaService.java | 14 ++----
.../internal/replicator/ReplicaManagerTest.java | 16 +++---
.../ignite/internal/app/ThreadPoolsManager.java | 20 +++++---
.../ignite/distributed/ReplicaUnavailableTest.java | 16 +++---
.../internal/table/distributed/TableManager.java | 37 ++++++--------
.../table/distributed/TableManagerTest.java | 16 +++---
.../apache/ignite/distributed/ItTxTestCluster.java | 16 +++---
.../ignite/internal/tx/impl/TxManagerImpl.java | 58 +++-------------------
10 files changed, 88 insertions(+), 139 deletions(-)
diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 2e46b2749c..e01245edb5 100644
--- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -40,6 +40,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -73,7 +76,6 @@ import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.TestReplicaMessagesFactory;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.topology.LogicalTopologyServiceTestImpl;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -117,18 +119,18 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest {
private final Map<String, Loza> raftManagers = new HashMap<>();
private final Map<String, TopologyAwareRaftGroupServiceFactory> raftClientFactory = new HashMap<>();
- private StripedThreadPoolExecutor partitionOperationsExecutor;
+ private ExecutorService partitionOperationsExecutor;
/** List of services to have to close before the test will be completed. */
private final List<Closeable> servicesToClose = new ArrayList<>();
@BeforeEach
public void beforeTest(TestInfo testInfo) {
- partitionOperationsExecutor = new StripedThreadPoolExecutor(
- 20,
- NamedThreadFactory.create("test", "partition-operations", log),
- false,
- 0
+ partitionOperationsExecutor = new ThreadPoolExecutor(
+ 0, 20,
+ 0, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create("test", "partition-operations", log)
);
placementDriverNodeNames = IntStream.range(BASE_PORT, BASE_PORT + 3).mapToObj(port -> testNodeName(testInfo, port))
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 3ec6fa1a05..d193cad4d5 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -72,9 +72,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TimestampAware;
-import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
@@ -135,11 +133,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
/** Scheduled executor for idle safe time sync. */
private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor;
- /**
- * Chooses a stripe using {@link ReplicationGroupStripes#stripeFor(ReplicationGroupId, StripedThreadPoolExecutor)}
- * so that requests concerning the same {@link ReplicationGroupId} are executed on the same thread.
- */
- private final ExecutorChooser<ReplicationGroupId> requestStripeChooser;
+ private final Executor requestsExecutor;
/** Set of message groups to handler as replica requests. */
private final Set<Class<?>> messageGroupsToHandle;
@@ -168,7 +162,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
HybridClock clock,
Set<Class<?>> messageGroupsToHandle,
PlacementDriver placementDriver,
- StripedThreadPoolExecutor requestsExecutor
+ Executor requestsExecutor
) {
this(
nodeName,
@@ -201,7 +195,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
HybridClock clock,
Set<Class<?>> messageGroupsToHandle,
PlacementDriver placementDriver,
- StripedThreadPoolExecutor requestsExecutor,
+ Executor requestsExecutor,
LongSupplier idleSafeTimePropagationPeriodMsSupplier
) {
this.clusterNetSvc = clusterNetSvc;
@@ -211,10 +205,9 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
this.handler = this::onReplicaMessageReceived;
this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived;
this.placementDriver = placementDriver;
+ this.requestsExecutor = requestsExecutor;
this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier;
- requestStripeChooser = new ChooseExecutorForReplicationGroup(requestsExecutor);
-
scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
1,
NamedThreadFactory.create(nodeName, "scheduled-idle-safe-time-sync-thread", LOG)
@@ -241,8 +234,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc
ReplicaRequest request = (ReplicaRequest) message;
- Executor stripeExecutor = requestStripeChooser.choose(request.groupId());
- stripeExecutor.execute(() -> handleReplicaRequest(request, senderConsistentId, correlationId));
+ requestsExecutor.execute(() -> handleReplicaRequest(request, senderConsistentId, correlationId));
}
private void handleReplicaRequest(ReplicaRequest request, String senderConsistentId, @Nullable Long correlationId) {
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index b1889fba29..a4b981cfaf 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -42,7 +43,6 @@ import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAware;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.network.ClusterNode;
/** The service is intended to execute requests on replicas. */
@@ -88,7 +88,7 @@ public class ReplicaService {
MessagingService messagingService,
HybridClock clock,
String localConsistentId,
- StripedThreadPoolExecutor executor
+ Executor executor
) {
this(
wrapMessagingService(messagingService, localConsistentId, executor),
@@ -96,17 +96,11 @@ public class ReplicaService {
);
}
- private static JumpToExecutorByConsistentIdAfterSend wrapMessagingService(
- MessagingService messagingService,
- String localConsistentId,
- StripedThreadPoolExecutor executor
- ) {
- ChooseExecutorForReplicationGroup chooserByGroupId = new ChooseExecutorForReplicationGroup(executor);
-
+ private static MessagingService wrapMessagingService(MessagingService messagingService, String localConsistentId, Executor executor) {
return new JumpToExecutorByConsistentIdAfterSend(
messagingService,
localConsistentId,
- request -> chooserByGroupId.choose(((ReplicaRequest) request).groupId())
+ request -> executor
);
}
diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 6cb73347b6..90c1dd9a26 100644
--- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -35,6 +35,9 @@ import static org.mockito.Mockito.when;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.EventListener;
@@ -48,7 +51,6 @@ import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
@@ -66,7 +68,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
public class ReplicaManagerTest extends BaseIgniteAbstractTest {
- private StripedThreadPoolExecutor requestsExecutor;
+ private ExecutorService requestsExecutor;
private ReplicaManager replicaManager;
@@ -90,11 +92,11 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest {
var clock = new HybridClockImpl();
- requestsExecutor = new StripedThreadPoolExecutor(
- 5,
- NamedThreadFactory.create(nodeName, "partition-operations", log),
- false,
- 0
+ requestsExecutor = new ThreadPoolExecutor(
+ 0, 5,
+ 0, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create(nodeName, "partition-operations", log)
);
replicaManager = new ReplicaManager(nodeName, clusterService, cmgManager, clock, Set.of(), placementDriver, requestsExecutor);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index 1442302f9c..a3f2aa27e9 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.util.IgniteUtils;
/**
@@ -49,7 +48,10 @@ public class ThreadPoolsManager implements IgniteComponent {
*/
private final ExecutorService tableIoExecutor;
- private final StripedThreadPoolExecutor partitionOperationsExecutor;
+ /**
+ * Executor on which partition operations are executed. Might do storage reads and writes (so it's expected to execute disk I/O).
+ */
+ private final ExecutorService partitionOperationsExecutor;
private final ScheduledExecutorService commonScheduler;
@@ -67,11 +69,13 @@ public class ThreadPoolsManager implements IgniteComponent {
new LinkedBlockingQueue<>(),
IgniteThreadFactory.create(nodeName, "tableManager-io", LOG, STORAGE_READ, STORAGE_WRITE));
- partitionOperationsExecutor = new StripedThreadPoolExecutor(
- Math.min(cpus * 3, 25),
- IgniteThreadFactory.create(nodeName, "partition-operations", LOG, STORAGE_READ, STORAGE_WRITE),
- false,
- 0
+ int partitionsOperationsThreads = Math.min(cpus * 3, 25);
+ partitionOperationsExecutor = new ThreadPoolExecutor(
+ partitionsOperationsThreads,
+ partitionsOperationsThreads,
+ 0, SECONDS,
+ new LinkedBlockingQueue<>(),
+ IgniteThreadFactory.create(nodeName, "partition-operations", LOG, STORAGE_READ, STORAGE_WRITE)
);
commonScheduler = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, "common-scheduler", LOG));
@@ -100,7 +104,7 @@ public class ThreadPoolsManager implements IgniteComponent {
/**
* Returns the executor of partition operations.
*/
- public StripedThreadPoolExecutor partitionOperationsExecutor() {
+ public ExecutorService partitionOperationsExecutor() {
return partitionOperationsExecutor;
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 7595ba6d11..42d923b145 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -40,6 +40,9 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -71,7 +74,6 @@ import org.apache.ignite.internal.table.distributed.replication.request.ReadWrit
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.type.NativeTypes;
@@ -110,7 +112,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
private ClusterService clusterService;
- private StripedThreadPoolExecutor requestsExecutor;
+ private ExecutorService requestsExecutor;
@BeforeEach
public void setup() {
@@ -125,11 +127,11 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
// This test is run without Meta storage.
when(cmgManager.metaStorageNodes()).thenReturn(emptySetCompletedFuture());
- requestsExecutor = new StripedThreadPoolExecutor(
- 5,
- NamedThreadFactory.create(NODE_NAME, "partition-operations", log),
- false,
- 0
+ requestsExecutor = new ThreadPoolExecutor(
+ 0, 5,
+ 0, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create(NODE_NAME, "partition-operations", log)
);
replicaService = new ReplicaService(clusterService.messagingService(), clock);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index af81a79c00..b6957a5119 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -136,10 +136,8 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
-import org.apache.ignite.internal.replicator.ChooseExecutorForReplicationGroup;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
@@ -178,10 +176,8 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
import org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
import org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
-import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
@@ -372,9 +368,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
private final StorageUpdateConfiguration storageUpdateConfig;
/**
- * Chooses a stripe to execute partition operations (that might cause I/O and/or be blocked on locks).
+ * Executes partition operations (that might cause I/O and/or be blocked on locks).
*/
- private final ExecutorChooser<ReplicationGroupId> partitionOperationsStripeChooser;
+ private final Executor partitionOperationsExecutor;
/** Marshallers provider. */
private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider();
@@ -425,7 +421,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
SchemaManager schemaManager,
LogStorageFactoryCreator volatileLogStorageFactoryCreator,
ExecutorService ioExecutor,
- StripedThreadPoolExecutor partitionOperationsExecutor,
+ Executor partitionOperationsExecutor,
HybridClock clock,
OutgoingSnapshotsManager outgoingSnapshotsManager,
TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
@@ -449,19 +445,18 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
this.schemaManager = schemaManager;
this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator;
this.ioExecutor = ioExecutor;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
this.clock = clock;
this.outgoingSnapshotsManager = outgoingSnapshotsManager;
this.raftGroupServiceFactory = raftGroupServiceFactory;
this.distributionZoneManager = distributionZoneManager;
- this.schemaSyncService = schemaSyncService;
+ this.schemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor);
this.catalogService = catalogService;
this.observableTimestampTracker = observableTimestampTracker;
- this.placementDriver = placementDriver;
+ this.placementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
this.sql = sql;
this.storageUpdateConfig = storageUpdateConfig;
- partitionOperationsStripeChooser = new ChooseExecutorForReplicationGroup(partitionOperationsExecutor);
-
TopologyService topologyService = clusterService.topologyService();
TxMessageSender txMessageSender = new TxMessageSender(clusterService.messagingService(), replicaSvc, clock);
@@ -974,11 +969,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
int tableId = tablePartitionId.tableId();
int partId = tablePartitionId.partitionId();
- Executor partitionOperationsStripe = partitionOperationsStripeChooser.choose(tablePartitionId);
-
return new PartitionReplicaListener(
mvPartitionStorage,
- new ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsStripe),
+ new ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor),
txManager,
lockMgr,
scanRequestExecutor,
@@ -994,9 +987,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
partitionUpdateHandlers.storageUpdateHandler,
new CatalogValidationSchemasSource(catalogService, schemaManager),
localNode(),
- new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsStripe),
+ schemaSyncService,
catalogService,
- new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsStripe),
+ placementDriver,
clusterService.topologyService()
);
}
@@ -1470,7 +1463,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
HybridTimestamp now = clock.now();
return orStopManagerFuture(schemaSyncService.waitForMetadataCompleteness(now))
- .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> {
+ .thenCompose(unused -> inBusyLockAsync(busyLock, () -> {
int catalogVersion = catalogService.activeCatalogVersion(now.longValue());
Collection<CatalogTableDescriptor> tableDescriptors = catalogService.tables(catalogVersion);
@@ -1484,7 +1477,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
.toArray(CompletableFuture[]::new);
return CompletableFutures.allOf(tableImplFutures);
- }), ioExecutor);
+ }));
}
/**
@@ -1570,7 +1563,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
HybridTimestamp now = clock.now();
return orStopManagerFuture(schemaSyncService.waitForMetadataCompleteness(now))
- .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> {
+ .thenCompose(unused -> inBusyLockAsync(busyLock, () -> {
int catalogVersion = catalogService.activeCatalogVersion(now.longValue());
// Check if the table has been deleted.
@@ -1579,7 +1572,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
}
return tableAsyncInternalBusy(tableId);
- }), ioExecutor);
+ }));
});
}
@@ -1622,7 +1615,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
HybridTimestamp now = clock.now();
return orStopManagerFuture(schemaSyncService.waitForMetadataCompleteness(now))
- .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> {
+ .thenCompose(unused -> inBusyLockAsync(busyLock, () -> {
CatalogTableDescriptor tableDescriptor = catalogService.table(name, now.longValue());
// Check if the table has been deleted.
@@ -1631,7 +1624,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
}
return tableAsyncInternalBusy(tableDescriptor.id());
- }), ioExecutor);
+ }));
});
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 493736fc0b..00a75d44e5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -62,8 +62,11 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongFunction;
@@ -117,7 +120,6 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -236,7 +238,7 @@ public class TableManagerTest extends IgniteAbstractTest {
/** Catalog manager. */
private CatalogManager catalogManager;
- private StripedThreadPoolExecutor partitionOperationsExecutor;
+ private ExecutorService partitionOperationsExecutor;
@BeforeEach
void before() throws NodeStoppingException {
@@ -266,11 +268,11 @@ public class TableManagerTest extends IgniteAbstractTest {
mockMetastore();
- partitionOperationsExecutor = new StripedThreadPoolExecutor(
- 5,
- IgniteThreadFactory.create("test", "partition-operations", log, STORAGE_READ, STORAGE_WRITE),
- false,
- 0
+ partitionOperationsExecutor = new ThreadPoolExecutor(
+ 0, 5,
+ 0, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ IgniteThreadFactory.create("test", "partition-operations", log, STORAGE_READ, STORAGE_WRITE)
);
}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 63ffe63804..36387a98c0 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -52,7 +52,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -124,7 +127,6 @@ import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -213,7 +215,7 @@ public class ItTxTestCluster {
private ScheduledThreadPoolExecutor executor;
- private StripedThreadPoolExecutor partitionOperationsExecutor;
+ private ExecutorService partitionOperationsExecutor;
protected IgniteTransactions igniteTransactions;
@@ -332,11 +334,11 @@ public class ItTxTestCluster {
executor = new ScheduledThreadPoolExecutor(20,
new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
- partitionOperationsExecutor = new StripedThreadPoolExecutor(
- 20,
- NamedThreadFactory.create("test", "partition-operations", LOG),
- false,
- 0
+ partitionOperationsExecutor = new ThreadPoolExecutor(
+ 0, 20,
+ 0, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create("test", "partition-operations", LOG)
);
for (int i = 0; i < nodes; i++) {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 348d110397..d0c969d101 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -78,7 +78,6 @@ import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
-import org.apache.ignite.internal.replicator.ChooseExecutorForReplicationGroup;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -88,9 +87,7 @@ import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutExcepti
import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
-import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LocalRwTxCounter;
@@ -207,47 +204,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
/** Counter of read-write transactions that were created and completed locally on the node. */
private final LocalRwTxCounter localRwTxCounter;
- private final ExecutorChooser<ReplicationGroupId> partitionOperationsStripeChooser;
-
- /**
- * The constructor.
- *
- * @param txConfig Transaction configuration.
- * @param clusterService Cluster service.
- * @param replicaService Replica service.
- * @param lockManager Lock manager.
- * @param clock A hybrid logical clock.
- * @param transactionIdGenerator Used to generate transaction IDs.
- * @param placementDriver Placement driver.
- * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms.
- * @param localRwTxCounter Counter of read-write transactions that were created and completed locally on the node.
- * @param partitionOperationsExecutor Executor on which partition operations will be executed, if needed.
- */
- public TxManagerImpl(
- TransactionConfiguration txConfig,
- ClusterService clusterService,
- ReplicaService replicaService,
- LockManager lockManager,
- HybridClock clock,
- TransactionIdGenerator transactionIdGenerator,
- PlacementDriver placementDriver,
- LongSupplier idleSafeTimePropagationPeriodMsSupplier,
- LocalRwTxCounter localRwTxCounter,
- StripedThreadPoolExecutor partitionOperationsExecutor
- ) {
- this(
- txConfig,
- clusterService,
- replicaService,
- lockManager,
- clock,
- transactionIdGenerator,
- placementDriver,
- idleSafeTimePropagationPeriodMsSupplier,
- localRwTxCounter,
- new ChooseExecutorForReplicationGroup(partitionOperationsExecutor)
- );
- }
+ private final Executor partitionOperationsExecutor;
/**
* Test-only constructor.
@@ -284,7 +241,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
placementDriver,
idleSafeTimePropagationPeriodMsSupplier,
localRwTxCounter,
- groupId -> ForkJoinPool.commonPool()
+ ForkJoinPool.commonPool()
);
}
@@ -300,9 +257,9 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
* @param placementDriver Placement driver.
* @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms.
* @param localRwTxCounter Counter of read-write transactions that were created and completed locally on the node.
- * @param partitionOperationsStripeChooser Chooser of an executor on which partition operations will be executed, if needed.
+ * @param partitionOperationsExecutor Executor on which partition operations will be executed, if needed.
*/
- private TxManagerImpl(
+ public TxManagerImpl(
TransactionConfiguration txConfig,
ClusterService clusterService,
ReplicaService replicaService,
@@ -312,7 +269,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
PlacementDriver placementDriver,
LongSupplier idleSafeTimePropagationPeriodMsSupplier,
LocalRwTxCounter localRwTxCounter,
- ExecutorChooser<ReplicationGroupId> partitionOperationsStripeChooser
+ Executor partitionOperationsExecutor
) {
this.txConfig = txConfig;
this.lockManager = lockManager;
@@ -324,7 +281,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
this.messagingService = clusterService.messagingService();
this.primaryReplicaEventListener = this::primaryReplicaEventListener;
this.localRwTxCounter = localRwTxCounter;
- this.partitionOperationsStripeChooser = partitionOperationsStripeChooser;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
placementDriverHelper = new PlacementDriverHelper(placementDriver, clock);
@@ -666,7 +623,6 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
if (TransactionFailureHandler.isRecoverable(cause)) {
LOG.warn("Failed to finish Tx. The operation will be retried [txId={}].", ex, txId);
- Executor stripe = partitionOperationsStripeChooser.choose(commitPartition);
return supplyAsync(() -> durableFinish(
observableTimestampTracker,
commitPartition,
@@ -675,7 +631,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
txId,
commitTimestamp,
txFinishFuture
- ), stripe).thenCompose(identity());
+ ), partitionOperationsExecutor).thenCompose(identity());
} else {
LOG.warn("Failed to finish Tx [txId={}].", ex, txId);