You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/27 12:07:01 UTC

[ignite-3] branch main updated: IGNITE-18633 Storage cleanup integration if one of them has not finished rebalancing with TableManager (#1586)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 84a65363c6 IGNITE-18633 Storage cleanup integration if one of them has not finished rebalancing with TableManager (#1586)
84a65363c6 is described below

commit 84a65363c690071e4d84af346672d35648b7ac88
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jan 27 15:06:54 2023 +0300

    IGNITE-18633 Storage cleanup integration if one of them has not finished rebalancing with TableManager (#1586)
---
 .../internal/table/distributed/TableManager.java   | 106 +++++++++------------
 .../distributed/storage/PartitionStorages.java     |  55 +++++++++++
 2 files changed, 102 insertions(+), 59 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c2296a8e2a..91e38e7649 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -131,6 +131,7 @@ import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -699,10 +700,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
 
-                CompletableFuture<MvPartitionStorage> mvPartitionStorageFut = getOrCreateMvPartition(internalTbl.storage(), partId);
+                CompletableFuture<PartitionStorages> partitionStoragesFut = getOrCreatePartitionStorages(table, partId);
 
-                CompletableFuture<PartitionDataStorage> partitionDataStorageFut = mvPartitionStorageFut
-                        .thenApply(mvPartitionStorage -> partitionDataStorage(mvPartitionStorage, internalTbl, partId));
+                CompletableFuture<PartitionDataStorage> partitionDataStorageFut = partitionStoragesFut
+                        .thenApply(partitionStorages -> partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+                                internalTbl, partId));
 
                 CompletableFuture<StorageUpdateHandler> storageUpdateHandlerFut = partitionDataStorageFut
                         .thenApply(storage -> new StorageUpdateHandler(partId, storage, table.indexStorageAdapters(partId)));
@@ -711,7 +713,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 // start new nodes, only if it is table creation, other cases will be covered by rebalance logic
                 if (oldPartAssignment.isEmpty() && localMemberAssignment != null) {
-                    startGroupFut = mvPartitionStorageFut.thenComposeAsync(mvPartitionStorage -> {
+                    startGroupFut = partitionStoragesFut.thenComposeAsync(partitionStorages -> {
+                        MvPartitionStorage mvPartitionStorage = partitionStorages.getMvPartitionStorage();
+
                         boolean hasData = mvPartitionStorage.lastAppliedIndex() > 0;
 
                         CompletableFuture<Boolean> fut;
@@ -752,8 +756,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                             return partitionDataStorageFut
                                     .thenCompose(s -> storageUpdateHandlerFut)
-                                    .thenCompose(s -> getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId))
-                                    .thenAcceptAsync(txStatePartitionStorage -> {
+                                    .thenAcceptAsync(storageUpdateHandler -> {
+                                        TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();
+
                                         RaftGroupOptions groupOptions = groupOptionsForPartition(
                                                 internalTbl.storage(),
                                                 internalTbl.txStateStorage(),
@@ -766,7 +771,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                         var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer);
 
                                         PartitionDataStorage partitionDataStorage = partitionDataStorageFut.join();
-                                        StorageUpdateHandler storageUpdateHandler = storageUpdateHandlerFut.join();
 
                                         try {
                                             // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273
@@ -817,12 +821,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                 return completedFuture(null);
                             }
 
-                            CompletableFuture<TxStateStorage> txStateStorageFuture =
-                                    getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId);
-
                             StorageUpdateHandler storageUpdateHandler = storageUpdateHandlerFut.join();
 
-                            return mvPartitionStorageFut.thenAcceptBoth(txStateStorageFuture, (partitionStorage, txStateStorage) -> {
+                            return partitionStoragesFut.thenAccept(partitionStorages -> {
+                                MvPartitionStorage partitionStorage = partitionStorages.getMvPartitionStorage();
+                                TxStateStorage txStateStorage = partitionStorages.getTxStateStorage();
+
                                 try {
                                     replicaMgr.startReplica(replicaGrpId,
                                             new PartitionReplicaListener(
@@ -1842,13 +1846,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             pendingAssignmentsWatchEvent.key(), partId, tbl.name(), localMember.address());
 
                     if (shouldStartLocalServices) {
-                        MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(internalTable.storage(), partId).join();
+                        PartitionStorages partitionStorages = getOrCreatePartitionStorages(tbl, partId).join();
+
+                        MvPartitionStorage mvPartitionStorage = partitionStorages.getMvPartitionStorage();
+                        TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();
+
                         PartitionDataStorage partitionDataStorage = partitionDataStorage(mvPartitionStorage, internalTable, partId);
                         StorageUpdateHandler storageUpdateHandler =
                                 new StorageUpdateHandler(partId, partitionDataStorage, tbl.indexStorageAdapters(partId));
 
-                        TxStateStorage txStatePartitionStorage = getOrCreateTxStateStorage(internalTable.txStateStorage(), partId).join();
-
                         RaftGroupOptions groupOptions = groupOptionsForPartition(
                                 internalTable.storage(),
                                 internalTable.txStateStorage(),
@@ -2046,51 +2052,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property);
     }
 
-    /**
-     * Returns or creates multi-versioned partition storage.
-     *
-     * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
-     * in when the rebalance was interrupted.
-     *
-     * @param mvTableStorage Multi-versioned table storage.
-     * @param partitionId Partition ID.
-     * @return Future that will complete when the operation completes.
-     */
-    private CompletableFuture<MvPartitionStorage> getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
-        // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
-        // TODO: IGNITE-18633 Also think about waiting for index stores for a partition, see PartitionAccessImpl.startRebalance
-        return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
-                .thenCompose(mvPartitionStorage -> {
-                    if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS) {
-                        return mvTableStorage.clearPartition(partitionId).thenApply(unused -> mvPartitionStorage);
-                    } else {
-                        return completedFuture(mvPartitionStorage);
-                    }
-                });
-    }
-
-    /**
-     * Returns or creates transaction state storage for a partition.
-     *
-     * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
-     * in when the rebalance was interrupted.
-     *
-     * @param txStateTableStorage Transaction state storage for a table.
-     * @param partitionId Partition ID.
-     * @return Future that will complete when the operation completes.
-     */
-    private CompletableFuture<TxStateStorage> getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partitionId) {
-        // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
-        return CompletableFuture.supplyAsync(() -> txStateTableStorage.getOrCreateTxStateStorage(partitionId), ioExecutor)
-                .thenCompose(txStateStorage -> {
-                    if (txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {
-                        return txStateStorage.clear().thenApply(unused -> txStateStorage);
-                    } else {
-                        return completedFuture(txStateStorage);
-                    }
-                });
-    }
-
     private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
         var peers = new HashSet<String>();
         var learners = new HashSet<String>();
@@ -2105,4 +2066,31 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         return PeersAndLearners.fromConsistentIds(peers, learners);
     }
+
+    /**
+     * Creates or gets partition stores. If one of the storages has not completed the rebalance, then the storages are cleared.
+     *
+     * @param table Table.
+     * @param partitionId Partition ID.
+     * @return Future of creating or getting partition stores.
+     */
+    // TODO: IGNITE-18619 Maybe we should wait here to create indexes, if you add now, then the tests start to hang
+    private CompletableFuture<PartitionStorages> getOrCreatePartitionStorages(TableImpl table, int partitionId) {
+        return CompletableFuture
+                .supplyAsync(() -> {
+                    MvPartitionStorage mvPartitionStorage = table.internalTable().storage().getOrCreateMvPartition(partitionId);
+                    TxStateStorage txStateStorage = table.internalTable().txStateStorage().getOrCreateTxStateStorage(partitionId);
+
+                    if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS
+                            || txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {
+                        return CompletableFuture.allOf(
+                                table.internalTable().storage().clearPartition(partitionId),
+                                txStateStorage.clear()
+                        ).thenApply(unused -> new PartitionStorages(mvPartitionStorage, txStateStorage));
+                    } else {
+                        return completedFuture(new PartitionStorages(mvPartitionStorage, txStateStorage));
+                    }
+                }, ioExecutor)
+                .thenCompose(Function.identity());
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java
new file mode 100644
index 0000000000..cb8251c0ed
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+
+/**
+ * Partition storages holder.
+ */
+public class PartitionStorages {
+    private final MvPartitionStorage mvPartitionStorage;
+
+    private final TxStateStorage txStateStorage;
+
+    /**
+     * Constructor.
+     *
+     * @param mvPartitionStorage Multi-versioned storage.
+     * @param txStateStorage Transaction state storage.
+     */
+    public PartitionStorages(MvPartitionStorage mvPartitionStorage, TxStateStorage txStateStorage) {
+        this.mvPartitionStorage = mvPartitionStorage;
+        this.txStateStorage = txStateStorage;
+    }
+
+    /**
+     * Returns multi-versioned storage.
+     */
+    public MvPartitionStorage getMvPartitionStorage() {
+        return mvPartitionStorage;
+    }
+
+    /**
+     * Returns transaction state storage.
+     */
+    public TxStateStorage getTxStateStorage() {
+        return txStateStorage;
+    }
+}