You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/24 14:07:50 UTC
[ignite-3] branch main updated: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier (#1562)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a8152876f1 IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier (#1562)
a8152876f1 is described below
commit a8152876f125a281b07cc8ae9527d1e8a412b34b
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Jan 24 17:07:43 2023 +0300
IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier (#1562)
---
.../internal/table/distributed/TableManager.java | 32 +-
.../distributed/raft/snapshot/PartitionAccess.java | 86 +++--
.../raft/snapshot/PartitionAccessImpl.java | 82 ++---
.../snapshot/incoming/IncomingSnapshotCopier.java | 379 +++++++++++----------
.../incoming/IncomingSnapshotCopierTest.java | 9 +-
.../state/test/TestTxStateTableStorage.java | 4 +-
6 files changed, 300 insertions(+), 292 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2e40b60fdd..bb02f0016a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -169,14 +169,6 @@ import org.jetbrains.annotations.TestOnly;
* Table manager.
*/
public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTablesInternal, IgniteComponent {
- /**
- * The special value of the last applied index to indicate the beginning of a full data rebalancing.
- *
- * @see MvPartitionStorage#lastAppliedIndex()
- * @see TxStateStorage#lastAppliedIndex()
- */
- public static final long FULL_RABALANCING_STARTED = -1;
-
private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
// TODO get rid of this in future? IGNITE-17307
@@ -2051,18 +2043,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return Future that will complete when the operation completes.
*/
private CompletableFuture<MvPartitionStorage> getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
- return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
- .thenCompose(storage -> {
- if (storage.persistedIndex() != FULL_RABALANCING_STARTED) {
- // If a full rebalance did not happen, then we return the storage as is.
- return completedFuture(storage);
- } else {
- // A full rebalance was started but not completed, so the partition must be recreated to remove the garbage.
- return mvTableStorage
- .destroyPartition(partitionId)
- .thenApplyAsync(unused -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
- }
- });
+ // TODO: IGNITE-18603 Clear if TxStateStorage hasn't been rebalanced yet
+ return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
}
/**
@@ -2075,15 +2057,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @param partId Partition ID.
*/
private static TxStateStorage getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partId) {
- TxStateStorage txStatePartitionStorage = txStateTableStorage.getOrCreateTxStateStorage(partId);
-
- // If a full rebalance did not happen, then we return the storage as is.
- if (txStatePartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
- return txStatePartitionStorage;
- }
-
- txStateTableStorage.destroyTxStateStorage(partId);
-
+ // TODO: IGNITE-18603 Clear if MvPartitionStorage hasn't been rebalanced yet
return txStateTableStorage.getOrCreateTxStateStorage(partId);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index d0297e96c4..75c66e232e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -21,10 +21,12 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.util.Cursor;
@@ -40,21 +42,6 @@ public interface PartitionAccess {
*/
PartitionKey partitionKey();
- /**
- * Destroys and recreates the multi-versioned partition storage.
- *
- * @return Future that will complete when the partition is recreated.
- * @throws StorageException If an error has occurred during the partition destruction.
- */
- CompletableFuture<Void> reCreateMvPartitionStorage() throws StorageException;
-
- /**
- * Destroys and recreates the TX state partition storage.
- *
- * @throws StorageException If an error has occurred during destruction of the transaction state storage for the partition.
- */
- void reCreateTxStatePartitionStorage() throws StorageException;
-
/**
* Creates a cursor to scan all meta of transactions.
*
@@ -124,11 +111,6 @@ public interface PartitionAccess {
*/
void addWriteCommitted(RowId rowId, @Nullable TableRow row, HybridTimestamp commitTimestamp);
- /**
- * Updates the last applied index, term, and RAFT configuration.
- */
- void updateLastApplied(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig);
-
/**
* Returns the minimum applied index of the partition storages.
*/
@@ -148,4 +130,68 @@ public interface PartitionAccess {
* Returns the maximum applied term of the partition storages.
*/
long maxLastAppliedTerm();
+
+ /**
+ * Prepares partition storages for rebalancing.
+ * <ul>
+ * <li>Cancels all current operations (including cursors) with storages and waits for their completion;</li>
+ * <li>Cleans up storages;</li>
+ * <li>Sets the last applied index and term to {@link MvPartitionStorage#REBALANCE_IN_PROGRESS} and the RAFT group configuration to
+ * {@code null};</li>
+ * <li>Only the following methods will be available:<ul>
+ * <li>{@link #partitionKey()};</li>
+ * <li>{@link #minLastAppliedIndex()};</li>
+ * <li>{@link #maxLastAppliedIndex()};</li>
+ * <li>{@link #minLastAppliedTerm()};</li>
+ * <li>{@link #maxLastAppliedTerm()};</li>
+ * <li>{@link #committedGroupConfiguration()};</li>
+ * <li>{@link #addTxMeta(UUID, TxMeta)};</li>
+ * <li>{@link #addWrite(RowId, TableRow, UUID, UUID, int)};</li>
+ * <li>{@link #addWriteCommitted(RowId, TableRow, HybridTimestamp)}.</li>
+ * </ul></li>
+ * </ul>
+ *
+ * <p>This method must be called before every rebalance and ends with a call to one of the methods:
+ * <ul>
+ * <li>{@link #abortRebalance()} - in case of errors or cancellation of rebalance;</li>
+ * <li>{@link #finishRebalance(long, long, RaftGroupConfiguration)} - in case of successful completion of rebalance.</li>
+ * </ul>
+ *
+ * @return Future of the operation.
+ * @throws StorageRebalanceException If there are errors when trying to start rebalancing.
+ */
+ CompletableFuture<Void> startRebalance();
+
+ /**
+ * Aborts rebalancing of the partition storages.
+ * <ul>
+ * <li>Cleans up storages;</li>
+ * <li>Resets the last applied index, term, and RAFT group configuration;</li>
+ * <li>All methods will be available.</li>
+ * </ul>
+ *
+ * <p>If rebalance has not started, then nothing will happen.
+ *
+ * @return Future of the operation.
+ * @throws StorageRebalanceException If there are errors when trying to abort rebalancing.
+ */
+ CompletableFuture<Void> abortRebalance();
+
+ /**
+ * Completes rebalancing of the partition storages.
+ * <ul>
+ * <li>Cleans up storages;</li>
+ * <li>Updates the last applied index, term, and RAFT group configuration;</li>
+ * <li>All methods will be available.</li>
+ * </ul>
+ *
+ * <p>If rebalance has not started, then {@link StorageRebalanceException} will be thrown.
+ *
+ * @param lastAppliedIndex Last applied index.
+ * @param lastAppliedTerm Last applied term.
+ * @param raftGroupConfig RAFT group configuration.
+ * @return Future of the operation.
+ * @throws StorageRebalanceException If there are errors when trying to finish rebalancing.
+ */
+ CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 2b28e74334..4748a541da 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
-import static org.apache.ignite.internal.table.distributed.TableManager.FULL_RABALANCING_STARTED;
-
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -29,7 +27,6 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.tx.TxMeta;
@@ -77,36 +74,6 @@ public class PartitionAccessImpl implements PartitionAccess {
return partitionKey;
}
- @Override
- public CompletableFuture<Void> reCreateMvPartitionStorage() throws StorageException {
- assert mvTableStorage.getMvPartition(partitionId()) != null : "table=" + tableName() + ", part=" + partitionId();
-
- // TODO: IGNITE-18030 - actually recreate or do in a different way
- //return mvTableStorage.destroyPartition(partId())
- return CompletableFuture.completedFuture(null)
- .thenApply(unused -> {
- MvPartitionStorage mvPartitionStorage = mvTableStorage.getOrCreateMvPartition(partitionId());
-
- mvPartitionStorage.runConsistently(() -> {
- mvPartitionStorage.lastApplied(FULL_RABALANCING_STARTED, 0);
-
- return null;
- });
-
- return null;
- });
- }
-
- @Override
- public void reCreateTxStatePartitionStorage() throws StorageException {
- assert txStateTableStorage.getTxStateStorage(partitionId()) != null : "table=" + tableName() + ", part=" + partitionId();
-
- // TODO: IGNITE-18030 - actually recreate or do in a different way
- //txStateTableStorage.destroyTxStateStorage(partId());
-
- txStateTableStorage.getOrCreateTxStateStorage(partitionId()).lastApplied(FULL_RABALANCING_STARTED, 0);
- }
-
private int partitionId() {
return partitionKey.partitionId();
}
@@ -166,22 +133,6 @@ public class PartitionAccessImpl implements PartitionAccess {
});
}
- @Override
- public void updateLastApplied(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
- MvPartitionStorage mvPartitionStorage = getMvPartitionStorage(partitionId());
- TxStateStorage txStateStorage = getTxStateStorage(partitionId());
-
- mvPartitionStorage.runConsistently(() -> {
- mvPartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
-
- txStateStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
-
- mvPartitionStorage.committedGroupConfiguration(raftGroupConfig);
-
- return null;
- });
- }
-
@Override
public long minLastAppliedIndex() {
return Math.min(
@@ -214,6 +165,39 @@ public class PartitionAccessImpl implements PartitionAccess {
);
}
+ @Override
+ public CompletableFuture<Void> startRebalance() {
+ // TODO: IGNITE-18619 Fix this bullshit, we should have already waited for the indexes to be created
+ indexes.get();
+
+ TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+
+ return CompletableFuture.allOf(
+ mvTableStorage.startRebalancePartition(partitionId()),
+ txStateStorage.startRebalance()
+ );
+ }
+
+ @Override
+ public CompletableFuture<Void> abortRebalance() {
+ TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+
+ return CompletableFuture.allOf(
+ mvTableStorage.abortRebalancePartition(partitionId()),
+ txStateStorage.abortRebalance()
+ );
+ }
+
+ @Override
+ public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
+ TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+
+ return CompletableFuture.allOf(
+ mvTableStorage.finishRebalancePartition(partitionId(), lastAppliedIndex, lastAppliedTerm, raftGroupConfig),
+ txStateStorage.finishRebalance(lastAppliedIndex, lastAppliedTerm)
+ );
+ }
+
private MvPartitionStorage getMvPartitionStorage(int partitionId) {
MvPartitionStorage mvPartitionStorage = mvTableStorage.getMvPartition(partitionId);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index f8a698b5d1..3e05c7f320 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -20,11 +20,11 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
import static java.util.concurrent.CompletableFuture.completedFuture;
import java.nio.ByteBuffer;
-import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.error.RaftError;
@@ -69,6 +70,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private final SnapshotUri snapshotUri;
+ /** Busy lock for synchronous rebalance cancellation. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
/**
* Snapshot meta read from the leader.
*
@@ -77,10 +81,14 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
@Nullable
private volatile SnapshotMeta snapshotMeta;
- private volatile boolean canceled;
+ @Nullable
+ private volatile CompletableFuture<?> rebalanceFuture;
+ /**
+ * Future is to wait in {@link #join()} because it is important for us to wait for the rebalance to finish or abort.
+ */
@Nullable
- private volatile CompletableFuture<?> future;
+ private volatile CompletableFuture<?> joinFuture;
/**
* Constructor.
@@ -97,46 +105,31 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
public void start() {
Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
- LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+ LOG.info("Copier is started for the partition [{}]", createPartitionInfo());
- future = prepareMvPartitionStorageForRebalance()
- .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+ rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
.thenCompose(unused -> {
ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
if (snapshotSender == null) {
- LOG.error(
- "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
- partId(),
- tableId(),
- snapshotUri.nodeName
- );
-
- if (!isOk()) {
- setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
- }
-
- return completedFuture(null);
+ throw new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName);
}
return loadSnapshotMeta(snapshotSender)
.thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
- .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
- .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+ .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor));
});
+
+ joinFuture = rebalanceFuture.handle((unused, throwable) -> completeRebalance(throwable)).thenCompose(Function.identity());
}
@Override
public void join() throws InterruptedException {
- CompletableFuture<?> fut = future;
+ CompletableFuture<?> fut = joinFuture;
if (fut != null) {
try {
fut.get();
-
- if (canceled && !isOk()) {
- setError(RaftError.ECANCELED, "Copier is cancelled");
- }
} catch (CancellationException e) {
// Ignored.
} catch (ExecutionException e) {
@@ -156,11 +149,11 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
@Override
public void cancel() {
- canceled = true;
+ busyLock.block();
- LOG.info("Copier is canceled for partition [partId={}, tableId={}]", partId(), tableId());
+ LOG.info("Copier is canceled for partition [{}]", createPartitionInfo());
- CompletableFuture<?> fut = future;
+ CompletableFuture<?> fut = rebalanceFuture;
if (fut != null) {
fut.cancel(false);
@@ -185,34 +178,6 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
return new IncomingSnapshotReader(snapshotMeta);
}
- /**
- * Prepares the {@link MvPartitionStorage} for a full rebalance.
- */
- private CompletableFuture<?> prepareMvPartitionStorageForRebalance() {
- if (canceled) {
- return completedFuture(null);
- }
-
- // TODO: IGNITE-18030 Delete it
- return partitionSnapshotStorage.partition().reCreateMvPartitionStorage();
- }
-
- /**
- * Prepares the {@link TxStateStorage} for a full rebalance.
- *
- * <p>Recreates {@link TxStateStorage} and sets the last applied index to {@link TableManager#FULL_RABALANCING_STARTED} so that when
- * the node is restarted, we can understand that the full rebalance has not completed, and we need to clean up the storage from
- * garbage.
- */
- private CompletableFuture<?> prepareTxStatePartitionStorageForRebalance(Executor executor) {
- if (canceled) {
- return completedFuture(null);
- }
-
- // TODO: IGNITE-18030 Delete it
- return CompletableFuture.runAsync(() -> partitionSnapshotStorage.partition().reCreateTxStatePartitionStorage(), executor);
- }
-
private @Nullable ClusterNode getSnapshotSender(String nodeName) {
return partitionSnapshotStorage.topologyService().getByConsistentId(nodeName);
}
@@ -221,178 +186,220 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
* Requests and saves the snapshot meta in {@link #snapshotMeta}.
*/
private CompletableFuture<?> loadSnapshotMeta(ClusterNode snapshotSender) {
- if (canceled) {
+ if (!busyLock.enterBusy()) {
return completedFuture(null);
}
- return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
- snapshotSender,
- MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
- NETWORK_TIMEOUT
- ).thenAccept(response -> {
- snapshotMeta = ((SnapshotMetaResponse) response).meta();
-
- LOG.info("Copier has loaded the snapshot meta for the partition [partId={}, tableId={}, meta={}]",
- partId(), tableId(), snapshotMeta);
- });
+ try {
+ return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+ snapshotSender,
+ MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
+ NETWORK_TIMEOUT
+ ).thenAccept(response -> {
+ snapshotMeta = ((SnapshotMetaResponse) response).meta();
+
+ LOG.info("Copier has loaded the snapshot meta for the partition [{}, meta={}]", createPartitionInfo(), snapshotMeta);
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
* Requests and stores data into {@link MvPartitionStorage}.
*/
private CompletableFuture<?> loadSnapshotMvData(ClusterNode snapshotSender, Executor executor) {
- if (canceled) {
+ if (!busyLock.enterBusy()) {
return completedFuture(null);
}
- return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
- snapshotSender,
- MSG_FACTORY.snapshotMvDataRequest()
- .id(snapshotUri.snapshotId)
- .batchSizeHint(MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT)
- .build(),
- NETWORK_TIMEOUT
- ).thenComposeAsync(response -> {
- SnapshotMvDataResponse snapshotMvDataResponse = ((SnapshotMvDataResponse) response);
-
- for (ResponseEntry entry : snapshotMvDataResponse.rows()) {
- if (canceled) {
- return completedFuture(null);
- }
-
- // Let's write all versions for the row ID.
- RowId rowId = new RowId(partId(), entry.rowId());
-
- for (int i = 0; i < entry.rowVersions().size(); i++) {
- HybridTimestamp timestamp = i < entry.timestamps().size() ? entry.timestamps().get(i) : null;
-
- ByteBuffer rowVersion = entry.rowVersions().get(i);
-
- TableRow tableRow = rowVersion == null ? null : new TableRow(rowVersion.rewind());
-
- PartitionAccess partition = partitionSnapshotStorage.partition();
-
- if (timestamp == null) {
- // Writes an intent to write (uncommitted version).
- assert entry.txId() != null;
- assert entry.commitTableId() != null;
- assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+ try {
+ return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+ snapshotSender,
+ MSG_FACTORY.snapshotMvDataRequest()
+ .id(snapshotUri.snapshotId)
+ .batchSizeHint(MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT)
+ .build(),
+ NETWORK_TIMEOUT
+ ).thenComposeAsync(response -> {
+ SnapshotMvDataResponse snapshotMvDataResponse = ((SnapshotMvDataResponse) response);
+
+ for (ResponseEntry entry : snapshotMvDataResponse.rows()) {
+ // Let's write all versions for the row ID.
+ for (int i = 0; i < entry.rowVersions().size(); i++) {
+ if (!busyLock.enterBusy()) {
+ return completedFuture(null);
+ }
- partition.addWrite(rowId, tableRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
- } else {
- // Writes committed version.
- partition.addWriteCommitted(rowId, tableRow, timestamp);
+ try {
+ writeVersion(entry, i);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
}
- }
- if (snapshotMvDataResponse.finish()) {
- LOG.info(
- "Copier has finished loading multi-versioned data [partId={}, rows={}]",
- partId(),
- snapshotMvDataResponse.rows().size()
- );
-
- return completedFuture(null);
- } else {
- LOG.info(
- "Copier has loaded a portion of multi-versioned data [partId={}, rows={}]",
- partId(),
- snapshotMvDataResponse.rows().size()
- );
-
- // Let's upload the rest.
- return loadSnapshotMvData(snapshotSender, executor);
- }
- }, executor);
+ if (snapshotMvDataResponse.finish()) {
+ LOG.info(
+ "Copier has finished loading multi-versioned data [{}, rows={}]",
+ createPartitionInfo(),
+ snapshotMvDataResponse.rows().size()
+ );
+
+ return completedFuture(null);
+ } else {
+ LOG.info(
+ "Copier has loaded a portion of multi-versioned data [{}, rows={}]",
+ createPartitionInfo(),
+ snapshotMvDataResponse.rows().size()
+ );
+
+ // Let's upload the rest.
+ return loadSnapshotMvData(snapshotSender, executor);
+ }
+ }, executor);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
* Requests and stores data into {@link TxStateStorage}.
*/
private CompletableFuture<?> loadSnapshotTxData(ClusterNode snapshotSender, Executor executor) {
- if (canceled) {
+ if (!busyLock.enterBusy()) {
return completedFuture(null);
}
- return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
- snapshotSender,
- MSG_FACTORY.snapshotTxDataRequest()
- .id(snapshotUri.snapshotId)
- .maxTransactionsInBatch(MAX_TX_DATA_BATCH_SIZE)
- .build(),
- NETWORK_TIMEOUT
- ).thenComposeAsync(response -> {
- SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) response;
-
- assert snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size();
+ try {
+ return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+ snapshotSender,
+ MSG_FACTORY.snapshotTxDataRequest()
+ .id(snapshotUri.snapshotId)
+ .maxTransactionsInBatch(MAX_TX_DATA_BATCH_SIZE)
+ .build(),
+ NETWORK_TIMEOUT
+ ).thenComposeAsync(response -> {
+ SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) response;
+
+ assert snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size() : createPartitionInfo();
+
+ for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
+ if (!busyLock.enterBusy()) {
+ return completedFuture(null);
+ }
- for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
- if (canceled) {
- return completedFuture(null);
+ try {
+ partitionSnapshotStorage.partition().addTxMeta(
+ snapshotTxDataResponse.txIds().get(i),
+ snapshotTxDataResponse.txMeta().get(i)
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
}
- partitionSnapshotStorage.partition().addTxMeta(
- snapshotTxDataResponse.txIds().get(i),
- snapshotTxDataResponse.txMeta().get(i)
- );
- }
+ if (snapshotTxDataResponse.finish()) {
+ LOG.info(
+ "Copier has finished loading transaction meta [{}, metas={}]",
+ createPartitionInfo(),
+ snapshotTxDataResponse.txMeta().size()
+ );
- if (snapshotTxDataResponse.finish()) {
- LOG.info(
- "Copier has finished loading transaction meta [partId={}, metas={}]",
- partId(),
- snapshotTxDataResponse.txMeta().size()
- );
-
- return completedFuture(null);
- } else {
- LOG.info(
- "Copier has loaded a portion of transaction meta [partId={}, metas={}]",
- partId(),
- snapshotTxDataResponse.txMeta().size()
- );
-
- // Let's upload the rest.
- return loadSnapshotTxData(snapshotSender, executor);
- }
- }, executor);
+ return completedFuture(null);
+ } else {
+ LOG.info(
+ "Copier has loaded a portion of transaction meta [{}, metas={}]",
+ createPartitionInfo(),
+ snapshotTxDataResponse.txMeta().size()
+ );
+
+ // Let's upload the rest.
+ return loadSnapshotTxData(snapshotSender, executor);
+ }
+ }, executor);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
- * Updates the last applied index for {@link MvPartitionStorage} and {@link TxStateStorage} from the {@link #snapshotMeta}.
+ * Completes rebalancing of the partition storages.
+ *
+ * @param throwable Error occurred while rebalancing the partition storages, {@code null} means that the rebalancing was successful.
*/
- private void updateLastAppliedIndexFromSnapshotMetaForStorages() {
- if (canceled) {
- return;
- }
+ private CompletableFuture<Void> completeRebalance(@Nullable Throwable throwable) {
+ if (!busyLock.enterBusy()) {
+ if (!isOk()) {
+ setError(RaftError.ECANCELED, "Copier is cancelled");
+ }
- SnapshotMeta meta = snapshotMeta;
+ return partitionSnapshotStorage.partition().abortRebalance();
+ }
- assert meta != null;
+ try {
+ if (throwable != null) {
+ LOG.error("Partition rebalancing error [{}]", throwable, createPartitionInfo());
- RaftGroupConfiguration raftGroupConfig = new RaftGroupConfiguration(
- meta.peersList(),
- meta.learnersList(),
- meta.oldPeersList(),
- meta.oldLearnersList()
- );
+ if (!isOk()) {
+ setError(RaftError.UNKNOWN, throwable.getMessage());
+ }
- partitionSnapshotStorage.partition().updateLastApplied(meta.lastIncludedIndex(), meta.lastIncludedTerm(), raftGroupConfig);
+ return partitionSnapshotStorage.partition().abortRebalance();
+ }
- LOG.info(
- "Copier has finished updating last applied index for the partition [partId={}, lastAppliedIndex={}, lastAppliedTerm={}]",
- partId(),
- meta.lastIncludedIndex(),
- meta.lastIncludedTerm()
- );
+ SnapshotMeta meta = snapshotMeta;
+
+ RaftGroupConfiguration raftGroupConfig = new RaftGroupConfiguration(
+ meta.peersList(),
+ meta.learnersList(),
+ meta.oldPeersList(),
+ meta.oldLearnersList()
+ );
+
+ LOG.info(
+ "Copier completes the rebalancing of the partition: [{}, lastAppliedIndex={}, lastAppliedTerm={}, raftGroupConfig={}]",
+ createPartitionInfo(),
+ meta.lastIncludedIndex(),
+ meta.lastIncludedTerm(),
+ raftGroupConfig
+ );
+
+ return partitionSnapshotStorage.partition().finishRebalance(meta.lastIncludedIndex(), meta.lastIncludedTerm(), raftGroupConfig);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
private int partId() {
return partitionSnapshotStorage.partition().partitionKey().partitionId();
}
- private UUID tableId() {
- return partitionSnapshotStorage.partition().partitionKey().tableId();
+ private String createPartitionInfo() {
+ return "tableId=" + partitionSnapshotStorage.partition().partitionKey().tableId() + ", partitionId=" + partId();
+ }
+
+ private void writeVersion(ResponseEntry entry, int i) {
+ RowId rowId = new RowId(partId(), entry.rowId());
+
+ HybridTimestamp timestamp = i < entry.timestamps().size() ? entry.timestamps().get(i) : null;
+
+ ByteBuffer rowVersion = entry.rowVersions().get(i);
+
+ TableRow tableRow = rowVersion == null ? null : new TableRow(rowVersion.rewind());
+
+ PartitionAccess partition = partitionSnapshotStorage.partition();
+
+ if (timestamp == null) {
+ // Writes an intent to write (uncommitted version).
+ assert entry.txId() != null;
+ assert entry.commitTableId() != null;
+ assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+ partition.addWrite(rowId, tableRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
+ } else {
+ // Writes committed version.
+ partition.addWriteCommitted(rowId, tableRow, timestamp);
+ }
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index bba3b0400d..7e6cd7c60d 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -189,13 +189,8 @@ public class IncomingSnapshotCopierTest {
assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
- // TODO: IGNITE-18030 - uncomment the following line or remove it if not needed after the rework
- //verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
- verify(incomingMvTableStorage, times(2)).getOrCreateMvPartition(eq(TEST_PARTITION));
-
- // TODO: IGNITE-18030 - uncomment the following line or remove it if not needed after the rework
- //verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
- verify(incomingTxStateTableStorage, times(2)).getOrCreateTxStateStorage(eq(TEST_PARTITION));
+ verify(incomingMvTableStorage, times(1)).startRebalancePartition(eq(TEST_PARTITION));
+ verify(incomingTxStatePartitionStorage, times(1)).startRebalance();
}
private MessagingService messagingServiceForSuccessScenario(MvPartitionStorage outgoingMvPartitionStorage,
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java
index ac23d35c21..d84d4ea013 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.tx.storage.state.test;
+import static org.mockito.Mockito.spy;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -31,7 +33,7 @@ public class TestTxStateTableStorage implements TxStateTableStorage {
private final Map<Integer, TxStateStorage> storages = new ConcurrentHashMap<>();
@Override public TxStateStorage getOrCreateTxStateStorage(int partitionId) {
- return storages.computeIfAbsent(partitionId, k -> new TestTxStateStorage());
+ return storages.computeIfAbsent(partitionId, k -> spy(new TestTxStateStorage()));
}
@Override