You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/27 12:07:01 UTC
[ignite-3] branch main updated: IGNITE-18633 Storage cleanup integration if one of them has not finished rebalancing with TableManager (#1586)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 84a65363c6 IGNITE-18633 Storage cleanup integration if one of them has not finished rebalancing with TableManager (#1586)
84a65363c6 is described below
commit 84a65363c690071e4d84af346672d35648b7ac88
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jan 27 15:06:54 2023 +0300
IGNITE-18633 Storage cleanup integration if one of them has not finished rebalancing with TableManager (#1586)
---
.../internal/table/distributed/TableManager.java | 106 +++++++++------------
.../distributed/storage/PartitionStorages.java | 55 +++++++++++
2 files changed, 102 insertions(+), 59 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c2296a8e2a..91e38e7649 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -131,6 +131,7 @@ import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -699,10 +700,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
- CompletableFuture<MvPartitionStorage> mvPartitionStorageFut = getOrCreateMvPartition(internalTbl.storage(), partId);
+ CompletableFuture<PartitionStorages> partitionStoragesFut = getOrCreatePartitionStorages(table, partId);
- CompletableFuture<PartitionDataStorage> partitionDataStorageFut = mvPartitionStorageFut
- .thenApply(mvPartitionStorage -> partitionDataStorage(mvPartitionStorage, internalTbl, partId));
+ CompletableFuture<PartitionDataStorage> partitionDataStorageFut = partitionStoragesFut
+ .thenApply(partitionStorages -> partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+ internalTbl, partId));
CompletableFuture<StorageUpdateHandler> storageUpdateHandlerFut = partitionDataStorageFut
.thenApply(storage -> new StorageUpdateHandler(partId, storage, table.indexStorageAdapters(partId)));
@@ -711,7 +713,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
// start new nodes, only if it is table creation, other cases will be covered by rebalance logic
if (oldPartAssignment.isEmpty() && localMemberAssignment != null) {
- startGroupFut = mvPartitionStorageFut.thenComposeAsync(mvPartitionStorage -> {
+ startGroupFut = partitionStoragesFut.thenComposeAsync(partitionStorages -> {
+ MvPartitionStorage mvPartitionStorage = partitionStorages.getMvPartitionStorage();
+
boolean hasData = mvPartitionStorage.lastAppliedIndex() > 0;
CompletableFuture<Boolean> fut;
@@ -752,8 +756,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return partitionDataStorageFut
.thenCompose(s -> storageUpdateHandlerFut)
- .thenCompose(s -> getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId))
- .thenAcceptAsync(txStatePartitionStorage -> {
+ .thenAcceptAsync(storageUpdateHandler -> {
+ TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();
+
RaftGroupOptions groupOptions = groupOptionsForPartition(
internalTbl.storage(),
internalTbl.txStateStorage(),
@@ -766,7 +771,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer);
PartitionDataStorage partitionDataStorage = partitionDataStorageFut.join();
- StorageUpdateHandler storageUpdateHandler = storageUpdateHandlerFut.join();
try {
// TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273
@@ -817,12 +821,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return completedFuture(null);
}
- CompletableFuture<TxStateStorage> txStateStorageFuture =
- getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId);
-
StorageUpdateHandler storageUpdateHandler = storageUpdateHandlerFut.join();
- return mvPartitionStorageFut.thenAcceptBoth(txStateStorageFuture, (partitionStorage, txStateStorage) -> {
+ return partitionStoragesFut.thenAccept(partitionStorages -> {
+ MvPartitionStorage partitionStorage = partitionStorages.getMvPartitionStorage();
+ TxStateStorage txStateStorage = partitionStorages.getTxStateStorage();
+
try {
replicaMgr.startReplica(replicaGrpId,
new PartitionReplicaListener(
@@ -1842,13 +1846,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
pendingAssignmentsWatchEvent.key(), partId, tbl.name(), localMember.address());
if (shouldStartLocalServices) {
- MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(internalTable.storage(), partId).join();
+ PartitionStorages partitionStorages = getOrCreatePartitionStorages(tbl, partId).join();
+
+ MvPartitionStorage mvPartitionStorage = partitionStorages.getMvPartitionStorage();
+ TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();
+
PartitionDataStorage partitionDataStorage = partitionDataStorage(mvPartitionStorage, internalTable, partId);
StorageUpdateHandler storageUpdateHandler =
new StorageUpdateHandler(partId, partitionDataStorage, tbl.indexStorageAdapters(partId));
- TxStateStorage txStatePartitionStorage = getOrCreateTxStateStorage(internalTable.txStateStorage(), partId).join();
-
RaftGroupOptions groupOptions = groupOptionsForPartition(
internalTable.storage(),
internalTable.txStateStorage(),
@@ -2046,51 +2052,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property);
}
- /**
- * Returns or creates multi-versioned partition storage.
- *
- * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
- * in when the rebalance was interrupted.
- *
- * @param mvTableStorage Multi-versioned table storage.
- * @param partitionId Partition ID.
- * @return Future that will complete when the operation completes.
- */
- private CompletableFuture<MvPartitionStorage> getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
- // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
- // TODO: IGNITE-18633 Also think about waiting for index stores for a partition, see PartitionAccessImpl.startRebalance
- return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
- .thenCompose(mvPartitionStorage -> {
- if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS) {
- return mvTableStorage.clearPartition(partitionId).thenApply(unused -> mvPartitionStorage);
- } else {
- return completedFuture(mvPartitionStorage);
- }
- });
- }
-
- /**
- * Returns or creates transaction state storage for a partition.
- *
- * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
- * in when the rebalance was interrupted.
- *
- * @param txStateTableStorage Transaction state storage for a table.
- * @param partitionId Partition ID.
- * @return Future that will complete when the operation completes.
- */
- private CompletableFuture<TxStateStorage> getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partitionId) {
- // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
- return CompletableFuture.supplyAsync(() -> txStateTableStorage.getOrCreateTxStateStorage(partitionId), ioExecutor)
- .thenCompose(txStateStorage -> {
- if (txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {
- return txStateStorage.clear().thenApply(unused -> txStateStorage);
- } else {
- return completedFuture(txStateStorage);
- }
- });
- }
-
private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
var peers = new HashSet<String>();
var learners = new HashSet<String>();
@@ -2105,4 +2066,31 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return PeersAndLearners.fromConsistentIds(peers, learners);
}
+
+ /**
+ * Creates or gets partition stores. If one of the storages has not completed the rebalance, then the storages are cleared.
+ *
+ * @param table Table.
+ * @param partitionId Partition ID.
+ * @return Future of creating or getting partition stores.
+ */
+ // TODO: IGNITE-18619 Maybe we should wait here to create indexes, if you add now, then the tests start to hang
+ private CompletableFuture<PartitionStorages> getOrCreatePartitionStorages(TableImpl table, int partitionId) {
+ return CompletableFuture
+ .supplyAsync(() -> {
+ MvPartitionStorage mvPartitionStorage = table.internalTable().storage().getOrCreateMvPartition(partitionId);
+ TxStateStorage txStateStorage = table.internalTable().txStateStorage().getOrCreateTxStateStorage(partitionId);
+
+ if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS
+ || txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {
+ return CompletableFuture.allOf(
+ table.internalTable().storage().clearPartition(partitionId),
+ txStateStorage.clear()
+ ).thenApply(unused -> new PartitionStorages(mvPartitionStorage, txStateStorage));
+ } else {
+ return completedFuture(new PartitionStorages(mvPartitionStorage, txStateStorage));
+ }
+ }, ioExecutor)
+ .thenCompose(Function.identity());
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java
new file mode 100644
index 0000000000..cb8251c0ed
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+
+/**
+ * Partition storages holder.
+ */
+public class PartitionStorages {
+ private final MvPartitionStorage mvPartitionStorage;
+
+ private final TxStateStorage txStateStorage;
+
+ /**
+ * Constructor.
+ *
+ * @param mvPartitionStorage Multi-versioned storage.
+ * @param txStateStorage Transaction state storage.
+ */
+ public PartitionStorages(MvPartitionStorage mvPartitionStorage, TxStateStorage txStateStorage) {
+ this.mvPartitionStorage = mvPartitionStorage;
+ this.txStateStorage = txStateStorage;
+ }
+
+ /**
+ * Returns multi-versioned storage.
+ */
+ public MvPartitionStorage getMvPartitionStorage() {
+ return mvPartitionStorage;
+ }
+
+ /**
+ * Returns transaction state storage.
+ */
+ public TxStateStorage getTxStateStorage() {
+ return txStateStorage;
+ }
+}