You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/02/01 11:00:39 UTC
[ignite-3] branch main updated: IGNITE-17958 PartitionListener should not close partition storage (#1599)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 600f420c95 IGNITE-17958 PartitionListener should not close partition storage (#1599)
600f420c95 is described below
commit 600f420c95b93d4dc5e6a8af368be6996693262a
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Feb 1 14:00:33 2023 +0300
IGNITE-17958 PartitionListener should not close partition storage (#1599)
---
.../internal/table/distributed/TableManager.java | 55 +++++++++++++---------
.../table/distributed/raft/PartitionListener.java | 6 ---
2 files changed, 32 insertions(+), 29 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 91e38e7649..c6f3df376c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -21,6 +21,7 @@ import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
@@ -1219,16 +1220,20 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
assert table != null : IgniteStringFormatter.format("There is no table with the name specified [name={}, id={}]",
name, tblId);
- CompletableFuture<Void> destroyMvStorageFuture = table.internalTable().storage().destroy();
-
- table.internalTable().txStateStorage().destroy();
+ CompletableFuture<Void> destroyTableStoragesFuture = allOf(
+ table.internalTable().storage().destroy(),
+ runAsync(() -> table.internalTable().txStateStorage().destroy(), ioExecutor)
+ );
CompletableFuture<?> dropSchemaRegistryFuture = schemaManager.dropRegistry(causalityToken, table.tableId())
.thenCompose(
v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, table)))
);
- beforeTablesVvComplete.add(allOf(destroyMvStorageFuture, dropSchemaRegistryFuture));
+ // TODO: IGNITE-18687 Must be asynchronous
+ destroyTableStoragesFuture.join();
+
+ beforeTablesVvComplete.add(allOf(destroyTableStoragesFuture, dropSchemaRegistryFuture));
} catch (Exception e) {
fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tblId, name), e);
}
@@ -1952,12 +1957,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), new WatchListener() {
@Override
public void onUpdate(WatchEvent evt) {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(new NodeStoppingException());
- }
-
- try {
- assert evt.single();
+ inBusyLock(busyLock, () -> {
+ assert evt.single() : evt;
Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
@@ -1965,19 +1966,21 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return;
}
- int part = extractPartitionNumber(stableAssignmentsWatchEvent.key());
- UUID tblId = extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX);
+ int partitionId = extractPartitionNumber(stableAssignmentsWatchEvent.key());
+ UUID tableId = extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX);
- TablePartitionId replicaGrpId = new TablePartitionId(tblId, part);
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId);
Set<Assignment> stableAssignments = ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
- byte[] pendingFromMetastorage = metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId),
- stableAssignmentsWatchEvent.revision()).join().value();
+ byte[] pendingAssignmentsFromMetaStorage = metaStorageMgr.get(
+ pendingPartAssignmentsKey(tablePartitionId),
+ stableAssignmentsWatchEvent.revision()
+ ).join().value();
- Set<Assignment> pendingAssignments = pendingFromMetastorage == null
+ Set<Assignment> pendingAssignments = pendingAssignmentsFromMetaStorage == null
? Set.of()
- : ByteUtils.fromBytes(pendingFromMetastorage);
+ : ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
String localMemberName = clusterService.topologyService().localMember().name();
@@ -1986,16 +1989,22 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (shouldStopLocalServices) {
try {
- raftMgr.stopRaftNodes(replicaGrpId);
+ raftMgr.stopRaftNodes(tablePartitionId);
- replicaMgr.stopReplica(new TablePartitionId(tblId, part));
+ replicaMgr.stopReplica(tablePartitionId);
} catch (NodeStoppingException e) {
// no-op
}
+
+ InternalTable internalTable = tablesByIdVv.latest().get(tableId).internalTable();
+
+ // Should be done fairly quickly.
+ allOf(
+ internalTable.storage().destroyPartition(partitionId),
+ runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+ ).join();
}
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
@Override
@@ -2083,7 +2092,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS
|| txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {
- return CompletableFuture.allOf(
+ return allOf(
table.internalTable().storage().clearPartition(partitionId),
txStateStorage.clear()
).thenApply(unused -> new PartitionStorages(mvPartitionStorage, txStateStorage));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 37fe4cc51b..9514245eb7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -416,12 +416,6 @@ public class PartitionListener implements RaftGroupListener {
@Override
public void onShutdown() {
- // TODO: IGNITE-17958 - probably, we should not close the storage here as PartitionListener did not create the storage.
- try {
- storage.close();
- } catch (RuntimeException e) {
- throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
- }
}
/**