You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/10/27 12:36:25 UTC
[ignite-3] branch main updated: IGNITE-17894 Implement RAFT snapshot streaming receiver (#1233)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 93914175c5 IGNITE-17894 Implement RAFT snapshot streaming receiver (#1233)
93914175c5 is described below
commit 93914175c5ce1c55d123c6a2225ff861eee2f8f5
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Oct 27 15:36:20 2022 +0300
IGNITE-17894 Implement RAFT snapshot streaming receiver (#1233)
---
.../org/apache/ignite/internal/util/Cursor.java | 11 +
.../org/apache/ignite/internal/storage/RowId.java | 30 +-
.../internal/storage/engine/MvTableStorage.java | 2 +-
.../storage/impl/TestMvPartitionStorage.java | 4 +-
.../internal/storage/impl/TestMvTableStorage.java | 4 +-
.../pagememory/AbstractPageMemoryTableStorage.java | 5 +-
.../storage/rocksdb/RocksDbTableStorage.java | 36 +-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 6 +-
.../internal/table/distributed/TableManager.java | 302 ++++++++++------
.../distributed/raft/snapshot/PartitionAccess.java | 35 +-
.../raft/snapshot/PartitionAccessImpl.java | 82 +++--
.../distributed/raft/snapshot/PartitionKey.java | 5 +-
.../raft/snapshot/PartitionSnapshotStorage.java | 26 +-
.../snapshot/PartitionSnapshotStorageFactory.java | 19 +-
.../distributed/raft/snapshot/SnapshotUri.java | 4 +-
.../snapshot/incoming/IncomingSnapshotCopier.java | 391 ++++++++++++++++++---
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 12 +-
.../snapshot/outgoing/OutgoingSnapshotReader.java | 2 +-
.../raft/snapshot/PartitionAccessImplTest.java | 86 -----
.../incoming/IncomingSnapshotCopierTest.java | 382 ++++++++++++++++++++
.../snapshot/outgoing/OutgoingSnapshotTest.java | 69 ++--
.../internal/tx/storage/state/TxStateStorage.java | 5 +
.../tx/storage/state/TxStateTableStorage.java | 4 +-
.../state/rocksdb/TxStateRocksDbStorage.java | 30 +-
.../state/rocksdb/TxStateRocksDbTableStorage.java | 52 ++-
.../test/TestConcurrentHashMapTxStateStorage.java | 21 +-
.../TestConcurrentHashMapTxStateTableStorage.java | 36 +-
27 files changed, 1183 insertions(+), 478 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
index e4a6f53a07..b08798ddcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
@@ -64,6 +64,17 @@ public interface Cursor<T> extends Iterator<T>, Iterable<T>, AutoCloseable {
};
}
+ /**
+ * Creates an iterable based cursor.
+ *
+ * @param iterable Iterable.
+ * @param <T> Type of elements.
+ * @return Cursor.
+ */
+ static <T> Cursor<T> fromIterable(Iterable<? extends T> iterable) {
+ return fromIterator(iterable.iterator());
+ }
+
/**
* Returns a sequential Stream over the elements covered by this cursor.
*
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
index 25dd5c4323..e38e3759fa 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
@@ -23,15 +23,15 @@ import org.apache.ignite.internal.tx.Timestamp;
import org.jetbrains.annotations.Nullable;
/**
- * Class that represents row id in primary index of the table. Contains a timestamp-based UUID and a partition id.
+ * Class that represents row ID in primary index of the table. Contains a timestamp-based UUID and a partition ID.
*
* @see MvPartitionStorage
*/
public final class RowId implements Serializable, Comparable<RowId> {
- /** Partition id. Short type reduces payload when transferring an object over network. */
+ /** Partition ID. Short type reduces payload when transferring an object over network. */
private final short partitionId;
- /** Unique id. */
+ /** Unique ID. */
private final UUID uuid;
public static RowId lowestRowId(int partitionId) {
@@ -39,9 +39,9 @@ public final class RowId implements Serializable, Comparable<RowId> {
}
/**
- * Create a row id with the UUID value based on {@link Timestamp}.
+ * Create a row ID with the UUID value based on {@link Timestamp}.
*
- * @param partitionId Partition id.
+ * @param partitionId Partition ID.
*/
public RowId(int partitionId) {
this(partitionId, Timestamp.nextVersion().toUuid());
@@ -50,7 +50,7 @@ public final class RowId implements Serializable, Comparable<RowId> {
/**
* Constructor.
*
- * @param partitionId Partition id.
+ * @param partitionId Partition ID.
* @param mostSignificantBits UUID's most significant bits.
* @param leastSignificantBits UUID's least significant bits.
*/
@@ -58,27 +58,33 @@ public final class RowId implements Serializable, Comparable<RowId> {
this(partitionId, new UUID(mostSignificantBits, leastSignificantBits));
}
- private RowId(int partitionId, UUID uuid) {
+ /**
+ * Constructor.
+ *
+ * @param partitionId Partition ID.
+ * @param uuid UUID.
+ */
+ public RowId(int partitionId, UUID uuid) {
this.partitionId = (short) partitionId;
this.uuid = uuid;
}
/**
- * Returns a partition id for current row id.
+ * Returns a partition ID for current row ID.
*/
public int partitionId() {
return partitionId & 0xFFFF;
}
/**
- * Returns the most significant 64 bits of row id's UUID.
+ * Returns the most significant 64 bits of row ID's UUID.
*/
public long mostSignificantBits() {
return uuid.getMostSignificantBits();
}
/**
- * Returns the least significant 64 bits of row id's UUID.
+ * Returns the least significant 64 bits of row ID's UUID.
*/
public long leastSignificantBits() {
return uuid.getLeastSignificantBits();
@@ -86,8 +92,6 @@ public final class RowId implements Serializable, Comparable<RowId> {
/**
* Returns the UUID equivalent of {@link #mostSignificantBits()} and {@link #leastSignificantBits()}.
- *
- * @return UUID.
*/
public UUID uuid() {
return uuid;
@@ -131,7 +135,7 @@ public final class RowId implements Serializable, Comparable<RowId> {
}
/**
- * Returns the next row id withing a single partition, or {@code null} if current row id already has maximal possible value.
+ * Returns the next row ID withing a single partition, or {@code null} if current row ID already has maximal possible value.
*/
public @Nullable RowId increment() {
long lsb = uuid.getLeastSignificantBits() + 1;
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index d5b06a103c..386c7db7b9 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -63,7 +63,7 @@ public interface MvTableStorage {
* @throws IllegalArgumentException If Partition ID is out of bounds.
* @throws StorageException If an error has occurred during the partition destruction.
*/
- CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException;
+ void destroyPartition(int partitionId) throws StorageException;
/**
* Returns an already created Index (either Sorted or Hash) with the given name or creates a new one if it does not exist.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 50e27bd297..6a01278bb4 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable;
public class TestMvPartitionStorage implements MvPartitionStorage {
private final ConcurrentNavigableMap<RowId, VersionChain> map = new ConcurrentSkipListMap<>();
- private long lastAppliedIndex = 0;
+ private volatile long lastAppliedIndex;
private final int partitionId;
@@ -106,8 +106,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
- assert lastAppliedIndex > this.lastAppliedIndex : "current=" + this.lastAppliedIndex + ", new=" + lastAppliedIndex;
-
this.lastAppliedIndex = lastAppliedIndex;
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 27a7c94a8c..9293496fad 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -100,15 +100,13 @@ public class TestMvTableStorage implements MvTableStorage {
}
@Override
- public CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException {
+ public void destroyPartition(int partitionId) throws StorageException {
Integer boxedPartitionId = partitionId;
partitions.remove(boxedPartitionId);
sortedIndicesById.values().forEach(indices -> indices.storageByPartitionId.remove(boxedPartitionId));
hashIndicesById.values().forEach(indices -> indices.storageByPartitionId.remove(boxedPartitionId));
-
- return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 76db11640e..969a798710 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -130,7 +130,7 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
}
@Override
- public CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException {
+ public void destroyPartition(int partitionId) throws StorageException {
assert started : "Storage has not started yet";
MvPartitionStorage partition = getMvPartition(partitionId);
@@ -141,9 +141,6 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
// TODO: IGNITE-17197 Actually destroy the partition.
//partition.destroy();
}
-
- // TODO: IGNITE-17197 Convert this to true async code.
- return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 58b1530379..1ba1f0763c 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -173,7 +173,6 @@ public class RocksDbTableStorage implements MvTableStorage {
return meta.columnFamily().handle();
}
- /** {@inheritDoc} */
@Override
public TableConfiguration configuration() {
return tableCfg;
@@ -184,7 +183,6 @@ public class RocksDbTableStorage implements MvTableStorage {
return tablesCfg;
}
- /** {@inheritDoc} */
@Override
public void start() throws StorageException {
flusher = new RocksDbFlusher(
@@ -306,7 +304,6 @@ public class RocksDbTableStorage implements MvTableStorage {
}
}
- /** {@inheritDoc} */
@Override
public void stop() throws StorageException {
if (!stopGuard.compareAndSet(false, true)) {
@@ -345,7 +342,6 @@ public class RocksDbTableStorage implements MvTableStorage {
}
}
- /** {@inheritDoc} */
@Override
public void destroy() throws StorageException {
stop();
@@ -353,7 +349,6 @@ public class RocksDbTableStorage implements MvTableStorage {
IgniteUtils.deleteIfExists(tablePath);
}
- /** {@inheritDoc} */
@Override
public RocksDbMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
RocksDbMvPartitionStorage partition = getMvPartition(partitionId);
@@ -371,7 +366,6 @@ public class RocksDbTableStorage implements MvTableStorage {
return partition;
}
- /** {@inheritDoc} */
@Override
public @Nullable RocksDbMvPartitionStorage getMvPartition(int partitionId) {
checkPartitionId(partitionId);
@@ -379,32 +373,24 @@ public class RocksDbTableStorage implements MvTableStorage {
return partitions.get(partitionId);
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException {
+ public void destroyPartition(int partitionId) throws StorageException {
checkPartitionId(partitionId);
RocksDbMvPartitionStorage mvPartition = partitions.getAndSet(partitionId, null);
- if (mvPartition == null) {
- return CompletableFuture.completedFuture(null);
- }
+ if (mvPartition != null) {
+ //TODO IGNITE-17626 Destroy indexes as well...
+ mvPartition.destroy();
- //TODO IGNITE-17626 Destroy indexes as well...
- mvPartition.destroy();
-
- // Wait for the data to actually be removed from the disk and close the storage.
- return awaitFlush(false)
- .whenComplete((v, e) -> {
- try {
- mvPartition.close();
- } catch (Exception ex) {
- LOG.error("Error when closing partition storage for partId = {}", ex, partitionId);
- }
- });
+ try {
+ mvPartition.close();
+ } catch (Exception e) {
+ throw new StorageException("Error when closing partition storage for the partition: " + partitionId, e);
+ }
+ }
}
- /** {@inheritDoc} */
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
@@ -452,7 +438,6 @@ public class RocksDbTableStorage implements MvTableStorage {
return storages.getOrCreateStorage(partitionStorage);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> destroyIndex(UUID indexId) {
HashIndex hashIdx = hashIndices.remove(indexId);
@@ -479,7 +464,6 @@ public class RocksDbTableStorage implements MvTableStorage {
}
}
- /** {@inheritDoc} */
@Override
public boolean isVolatile() {
return false;
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 038404585b..b6a54467de 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.storage.rocksdb;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -27,7 +26,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import java.nio.file.Path;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -108,13 +106,11 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
partitionStorage1.runConsistently(() -> partitionStorage1.addWrite(rowId1, testData, txId, UUID.randomUUID(), 0));
- CompletableFuture<Void> destroyFuture = tableStorage.destroyPartition(PARTITION_ID_0);
+ tableStorage.destroyPartition(PARTITION_ID_0);
// Partition destruction doesn't enforce flush.
((RocksDbTableStorage) tableStorage).awaitFlush(true);
- assertThat(destroyFuture, willCompleteSuccessfully());
-
assertThat(tableStorage.getMvPartition(PARTITION_ID_0), is(nullValue()));
assertThat(tableStorage.getOrCreateMvPartition(PARTITION_ID_0).read(rowId0, HybridTimestamp.MAX_VALUE).binaryRow(),
is(nullValue()));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b892fb396a..60f6bda20e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -131,6 +131,7 @@ import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
import org.apache.ignite.internal.util.ByteUtils;
@@ -169,6 +170,14 @@ import org.jetbrains.annotations.TestOnly;
*/
public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
IgniteComponent {
+ /**
+ * The special value of the last applied index to indicate the beginning of a full data rebalancing.
+ *
+ * @see MvPartitionStorage#lastAppliedIndex()
+ * @see TxStateStorage#lastAppliedIndex()
+ */
+ public static final long FULL_RABALANCING_STARTED = -1;
+
private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
// TODO get rid of this in future? IGNITE-17307
@@ -286,6 +295,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Assignment change event listeners. */
private final CopyOnWriteArrayList<Consumer<IgniteTablesInternal>> assignmentsChangeListeners = new CopyOnWriteArrayList<>();
+ /** Incoming RAFT snapshots executor. */
+ private final ExecutorService incomingSnapshotsExecutor;
+
/** Rebalance scheduler pool size. */
private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
@@ -413,6 +425,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(nodeName, "tableManager-io", LOG));
+
+ incomingSnapshotsExecutor = new ThreadPoolExecutor(
+ Utils.cpus(),
+ Utils.cpus(),
+ 100,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG)
+ );
}
/** {@inheritDoc} */
@@ -680,10 +701,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
MvTableStorage storage = internalTbl.storage();
boolean isInMemory = storage.isVolatile();
- // TODO: IGNITE-17197 Remove assert after the ticket is resolved.
- assert internalTbl.storage() instanceof MvTableStorage :
- "Only multi version storages are supported. Current storage is a " + internalTbl.storage().getClass().getName();
-
// start new nodes, only if it is table creation
// other cases will be covered by rebalance logic
Set<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptySet();
@@ -697,10 +714,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
- startGroupFut = CompletableFuture
- .supplyAsync(() -> getOrCreateMvPartition(internalTbl, partId), ioExecutor)
- .thenComposeAsync((partitionStorage) -> {
- boolean hasData = partitionStorage.lastAppliedIndex() > 0;
+ startGroupFut = CompletableFuture.supplyAsync(() -> getOrCreateMvPartition(internalTbl.storage(), partId), ioExecutor)
+ .thenComposeAsync(mvPartitionStorage -> {
+ boolean hasData = mvPartitionStorage.lastAppliedIndex() > 0;
CompletableFuture<Boolean> fut;
@@ -732,42 +748,52 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
fut = CompletableFuture.completedFuture(true);
}
- return fut.thenComposeAsync(startGroup -> {
+ return fut.thenCompose(startGroup -> {
if (!startGroup) {
return CompletableFuture.completedFuture(null);
}
- RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, partId, tblCfg, partitionStorage,
- newPartAssignment);
-
- try {
- raftMgr.startRaftGroupNode(
- replicaGrpId,
- newPartAssignment,
- new PartitionListener(
- partitionDataStorage(partitionStorage, internalTbl, partId),
- internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
- txManager,
- primaryIndex
- ),
- new RebalanceRaftGroupEventsListener(
- metaStorageMgr,
- tablesCfg.tables().get(tablesById.get(tblId).name()),
- replicaGrpId,
- partId,
- busyLock,
- movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
- this::calculateAssignments,
- rebalanceScheduler
- ),
- groupOptions
- );
-
- return CompletableFuture.completedFuture(null);
- } catch (NodeStoppingException ex) {
- return CompletableFuture.failedFuture(ex);
- }
- }, ioExecutor);
+ return CompletableFuture.supplyAsync(
+ () -> getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
+ ioExecutor
+ )
+ .thenComposeAsync(txStatePartitionStorage -> {
+ RaftGroupOptions groupOptions = groupOptionsForPartition(
+ internalTbl.storage(),
+ internalTbl.txStateStorage(),
+ partitionKey(internalTbl, partId),
+ newPartAssignment
+ );
+
+ try {
+ raftMgr.startRaftGroupNode(
+ replicaGrpId,
+ newPartAssignment,
+ new PartitionListener(
+ partitionDataStorage(mvPartitionStorage, internalTbl, partId),
+ txStatePartitionStorage,
+ txManager,
+ primaryIndex
+ ),
+ new RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ tablesCfg.tables().get(tablesById.get(tblId).name()),
+ replicaGrpId,
+ partId,
+ busyLock,
+ movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
+ this::calculateAssignments,
+ rebalanceScheduler
+ ),
+ groupOptions
+ );
+
+ return CompletableFuture.completedFuture(null);
+ } catch (NodeStoppingException ex) {
+ return CompletableFuture.failedFuture(ex);
+ }
+ }, ioExecutor);
+ });
}, ioExecutor);
}
@@ -779,36 +805,47 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return CompletableFuture.failedFuture(ex);
}
}, ioExecutor)
- .thenAccept(
- updatedRaftGroupService -> {
- ((InternalTableImpl) internalTbl)
- .updateInternalTableRaftGroupService(partId, updatedRaftGroupService);
-
- if (replicaMgr.shouldHaveReplicationGroupLocally(nodes)) {
- MvPartitionStorage partitionStorage = getOrCreateMvPartition(internalTbl, partId);
-
- try {
- replicaMgr.startReplica(replicaGrpId,
- new PartitionReplicaListener(
- partitionStorage,
- updatedRaftGroupService,
- txManager,
- lockMgr,
- partId,
- tblId,
- primaryIndex,
- clock,
- internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
- topologyService,
- placementDriver
- )
- );
- } catch (NodeStoppingException ex) {
- throw new AssertionError("Loza was stopped before Table manager", ex);
- }
- }
- }
- ).exceptionally(th -> {
+ .thenCompose(updatedRaftGroupService -> {
+ ((InternalTableImpl) internalTbl).updateInternalTableRaftGroupService(partId, updatedRaftGroupService);
+
+ if (replicaMgr.shouldHaveReplicationGroupLocally(nodes)) {
+ return CompletableFuture.supplyAsync(
+ () -> getOrCreateMvPartition(internalTbl.storage(), partId),
+ ioExecutor
+ )
+ .thenCombine(
+ CompletableFuture.supplyAsync(
+ () -> getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
+ ioExecutor
+ ),
+ (mvPartitionStorage, txStatePartitionStorage) -> {
+ try {
+ replicaMgr.startReplica(replicaGrpId,
+ new PartitionReplicaListener(
+ mvPartitionStorage,
+ updatedRaftGroupService,
+ txManager,
+ lockMgr,
+ partId,
+ tblId,
+ primaryIndex,
+ clock,
+ txStatePartitionStorage,
+ topologyService,
+ placementDriver
+ )
+ );
+ } catch (NodeStoppingException ex) {
+ throw new AssertionError("Loza was stopped before Table manager", ex);
+ }
+
+ return null;
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ })
+ .exceptionally(th -> {
LOG.warn("Unable to update raft groups on the node", th);
return null;
@@ -821,18 +858,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CompletableFuture.allOf(futures).join();
}
- private PartitionDataStorage partitionDataStorage(MvPartitionStorage partitionStorage, InternalTable internalTbl, int partId) {
- return new SnapshotAwarePartitionDataStorage(partitionStorage, outgoingSnapshotsManager, partitionKey(internalTbl, partId));
- }
-
- private PartitionKey partitionKey(InternalTable internalTbl, int partId) {
- return new PartitionKey(internalTbl.tableId(), partId);
- }
-
- private static MvPartitionStorage getOrCreateMvPartition(InternalTable internalTbl, int partId) {
- return internalTbl.storage().getOrCreateMvPartition(partId);
- }
-
/**
* Calculates the quantity of the data nodes for the partition of the table.
*
@@ -860,15 +885,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
private RaftGroupOptions groupOptionsForPartition(
- InternalTable internalTbl,
- int partId,
- ExtendedTableConfiguration tableConfig,
- MvPartitionStorage partitionStorage,
+ MvTableStorage mvTableStorage,
+ TxStateTableStorage txStateTableStorage,
+ PartitionKey partitionKey,
Set<ClusterNode> peers
) {
RaftGroupOptions raftGroupOptions;
- if (internalTbl.storage().isVolatile()) {
+ if (mvTableStorage.isVolatile()) {
raftGroupOptions = RaftGroupOptions.forVolatileStores()
.setLogStorageFactory(volatileLogStorageFactoryCreator.factory(raftMgr.volatileRaft().logStorage().value()))
.raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage());
@@ -881,9 +905,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
raftMgr.topologyService(),
//TODO IGNITE-17302 Use miniumum from mv storage and tx state storage.
outgoingSnapshotsManager,
- new PartitionAccessImpl(partitionKey(internalTbl, partId), partitionStorage),
+ new PartitionAccessImpl(partitionKey, mvTableStorage, txStateTableStorage),
peers.stream().map(n -> new Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()),
- List.of()
+ List.of(),
+ incomingSnapshotsExecutor
));
return raftGroupOptions;
@@ -910,6 +935,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(txStateStoragePool, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(txStateStorageScheduledPool, 10, TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(incomingSnapshotsExecutor, 10, TimeUnit.SECONDS);
}
/**
@@ -1656,11 +1682,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name());
- // TODO: IGNITE-17197 Remove assert after the ticket is resolved.
- assert tbl.internalTable().storage() instanceof MvTableStorage :
- "Only multi version storages are supported. Current storage is a "
- + tbl.internalTable().storage().getClass().getName();
-
// Stable assignments from the meta store, which revision is bounded by the current pending event.
byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
pendingAssignmentsWatchEvent.revision()).join().value();
@@ -1680,25 +1701,31 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
+ InternalTable internalTable = tbl.internalTable();
+
try {
LOG.info("Received update on pending assignments. Check if new raft group should be started"
+ " [key={}, partition={}, table={}, localMemberAddress={}]",
pendingAssignmentsWatchEvent.key(), partId, tbl.name(), localMember.address());
if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
- MvPartitionStorage partitionStorage = getOrCreateMvPartition(tbl.internalTable(), partId);
+ MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(internalTable.storage(), partId);
+
+ TxStateStorage txStatePartitionStorage = getOrCreateTxStatePartitionStorage(
+ internalTable.txStateStorage(),
+ partId
+ );
RaftGroupOptions groupOptions = groupOptionsForPartition(
- tbl.internalTable(),
- partId,
- tblCfg,
- partitionStorage,
+ internalTable.storage(),
+ internalTable.txStateStorage(),
+ partitionKey(internalTable, partId),
assignments
);
RaftGroupListener raftGrpLsnr = new PartitionListener(
- partitionDataStorage(partitionStorage, tbl.internalTable(), partId),
- tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
+ partitionDataStorage(mvPartitionStorage, internalTable, partId),
+ txStatePartitionStorage,
txManager,
primaryIndex
);
@@ -1709,7 +1736,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
replicaGrpId,
partId,
busyLock,
- movePartition(() -> tbl.internalTable().partitionRaftGroupService(partId)),
+ movePartition(() -> internalTable.partitionRaftGroupService(partId)),
TableManager.this::calculateAssignments,
rebalanceScheduler
);
@@ -1724,19 +1751,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
if (replicaMgr.shouldHaveReplicationGroupLocally(deltaPeers)) {
- MvPartitionStorage partitionStorage = getOrCreateMvPartition(tbl.internalTable(), partId);
+ MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(internalTable.storage(), partId);
+
+ TxStateStorage txStatePartitionStorage = getOrCreateTxStatePartitionStorage(
+ internalTable.txStateStorage(),
+ partId
+ );
replicaMgr.startReplica(replicaGrpId,
new PartitionReplicaListener(
- partitionStorage,
- tbl.internalTable().partitionRaftGroupService(partId),
+ mvPartitionStorage,
+ internalTable.partitionRaftGroupService(partId),
txManager,
lockMgr,
partId,
tblId,
primaryIndex,
clock,
- tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
+ txStatePartitionStorage,
raftMgr.topologyService(),
placementDriver
)
@@ -1754,7 +1786,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
var newNodes = newPeers.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
- RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(partId);
+ RaftGroupService partGrpSvc = internalTable.partitionRaftGroupService(partId);
IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join();
@@ -1922,4 +1954,60 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private <T extends ConfigurationProperty<?>> T directProxy(T property) {
return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property);
}
+
+ private PartitionDataStorage partitionDataStorage(MvPartitionStorage partitionStorage, InternalTable internalTbl, int partId) {
+ return new SnapshotAwarePartitionDataStorage(partitionStorage, outgoingSnapshotsManager, partitionKey(internalTbl, partId));
+ }
+
+ private PartitionKey partitionKey(InternalTable internalTbl, int partId) {
+ return new PartitionKey(internalTbl.tableId(), partId);
+ }
+
+ /**
+ * Returns or creates multi-versioned partition storage.
+ *
+ * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
+ * in when the rebalance was interrupted.
+ *
+ * @param mvTableStorage Multi-versioned table storage.
+ * @param partId Partition ID.
+ */
+ private static MvPartitionStorage getOrCreateMvPartition(MvTableStorage mvTableStorage, int partId) {
+ MvPartitionStorage mvPartitionStorage = mvTableStorage.getOrCreateMvPartition(partId);
+
+ // If a full rebalance did not happen, then we return the storage as is.
+ if (mvPartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
+ return mvPartitionStorage;
+ }
+
+ // A full rebalance was started but not completed, so the partition must be recreated to remove the garbage.
+ mvTableStorage.destroyPartition(partId);
+
+ return mvTableStorage.getOrCreateMvPartition(partId);
+ }
+
+ /**
+ * Returns or creates transaction state storage for a partition.
+ *
+ * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
+ * in when the rebalance was interrupted.
+ *
+ * @param txStateTableStorage Transaction state storage for a table.
+ * @param partId Partition ID.
+ */
+ private static TxStateStorage getOrCreateTxStatePartitionStorage(
+ TxStateTableStorage txStateTableStorage,
+ int partId
+ ) {
+ TxStateStorage txStatePartitionStorage = txStateTableStorage.getOrCreateTxStateStorage(partId);
+
+ // If a full rebalance did not happen, then we return the storage as is.
+ if (txStatePartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
+ return txStatePartitionStorage;
+ }
+
+ txStateTableStorage.destroyTxStateStorage(partId);
+
+ return txStateTableStorage.getOrCreateTxStateStorage(partId);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index 1ab37e5bc8..a0abd1899d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
-import java.util.List;
-import org.apache.ignite.internal.storage.ReadResult;
-import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
/**
* Small abstractions for partition storages that includes only methods, mandatory for the snapshot storage.
@@ -29,31 +27,30 @@ import org.jetbrains.annotations.Nullable;
public interface PartitionAccess {
/**
* Returns the key that uniquely identifies the corresponding partition.
- *
- * @return Partition key.
*/
- PartitionKey key();
+ PartitionKey partitionKey();
+
+ /**
+ * Returns the multi-versioned partition storage.
+ */
+ MvPartitionStorage mvPartitionStorage();
/**
- * Returns persisted RAFT index for the partition.
+ * Returns transaction state storage for the partition.
*/
- long persistedIndex();
+ TxStateStorage txStatePartitionStorage();
/**
- * Returns a row id, existing in the storage, that's greater or equal than the lower bound. {@code null} if not found.
+ * Destroys and recreates the multi-versioned partition storage.
*
- * @param lowerBound Lower bound.
- * @throws StorageException If failed to read data from the storage.
+ * @throws StorageException If an error has occurred during the partition destruction.
*/
- @Nullable
- RowId closestRowId(RowId lowerBound);
+ MvPartitionStorage reCreateMvPartitionStorage() throws StorageException;
/**
- * Returns all versions of a row identified with the given {@link RowId}.
- * The returned versions are in newest-to-oldest order.
+ * Destroys and recreates the multi-versioned partition storage.
*
- * @param rowId Id of the row.
- * @return All versions of the row.
+ * @throws StorageException If an error has occurred during transaction state storage for the partition destruction.
*/
- List<ReadResult> rowVersions(RowId rowId);
+ TxStateStorage reCreateTxStatePartitionStorage() throws StorageException;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index cf20f2c9d8..d847f03095 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -17,55 +17,85 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.ReadResult;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.util.Cursor;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
/**
- * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}.
+ * {@link PartitionAccess} implementation.
*/
public class PartitionAccessImpl implements PartitionAccess {
private final PartitionKey partitionKey;
- private final MvPartitionStorage partitionStorage;
- public PartitionAccessImpl(PartitionKey partitionKey, MvPartitionStorage partitionStorage) {
+ private final MvTableStorage mvTableStorage;
+
+ private final TxStateTableStorage txStateTableStorage;
+
+ /**
+ * Constructor.
+ *
+ * @param partitionKey Partition key.
+ * @param mvTableStorage Multi version table storage.
+ * @param txStateTableStorage Table transaction state storage.
+ */
+ public PartitionAccessImpl(
+ PartitionKey partitionKey,
+ MvTableStorage mvTableStorage,
+ TxStateTableStorage txStateTableStorage
+ ) {
this.partitionKey = partitionKey;
- this.partitionStorage = partitionStorage;
+ this.mvTableStorage = mvTableStorage;
+ this.txStateTableStorage = txStateTableStorage;
}
@Override
- public PartitionKey key() {
+ public PartitionKey partitionKey() {
return partitionKey;
}
@Override
- public long persistedIndex() {
- return partitionStorage.persistedIndex();
+ public MvPartitionStorage mvPartitionStorage() {
+ MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(partId());
+
+ assert mvPartition != null : "table=" + tableName() + ", part=" + partId();
+
+ return mvPartition;
+ }
+
+ @Override
+ public TxStateStorage txStatePartitionStorage() {
+ TxStateStorage txStatePartitionStorage = txStateTableStorage.getTxStateStorage(partId());
+
+ assert txStatePartitionStorage != null : "table=" + tableName() + ", part=" + partId();
+
+ return txStatePartitionStorage;
}
@Override
- public @Nullable RowId closestRowId(RowId lowerBound) {
- return partitionStorage.closestRowId(lowerBound);
+ public MvPartitionStorage reCreateMvPartitionStorage() throws StorageException {
+ assert mvTableStorage.getMvPartition(partId()) != null : "table=" + tableName() + ", part=" + partId();
+
+ mvTableStorage.destroyPartition(partId());
+
+ return mvTableStorage.getOrCreateMvPartition(partId());
}
@Override
- public List<ReadResult> rowVersions(RowId rowId) {
- try (Cursor<ReadResult> cursor = partitionStorage.scanVersions(rowId)) {
- List<ReadResult> versions = new ArrayList<>();
+ public TxStateStorage reCreateTxStatePartitionStorage() throws StorageException {
+ assert txStateTableStorage.getTxStateStorage(partId()) != null : "table=" + tableName() + ", part=" + partId();
- for (ReadResult version : cursor) {
- versions.add(version);
- }
+ txStateTableStorage.destroyTxStateStorage(partId());
- return versions;
- } catch (Exception e) {
- // TODO: IGNITE-17935 - handle this
+ return txStateTableStorage.getOrCreateTxStateStorage(partId());
+ }
+
+ private int partId() {
+ return partitionKey.partitionId();
+ }
- throw new RuntimeException(e);
- }
+ private String tableName() {
+ return mvTableStorage.configuration().name().value();
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java
index dd914c39f1..683fa67fce 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java
@@ -26,12 +26,11 @@ import org.apache.ignite.internal.tostring.S;
*/
public class PartitionKey {
private final UUID tableId;
+
private final int partitionId;
/**
* Returns ID of the table.
- *
- * @return ID of the table.
*/
public UUID tableId() {
return tableId;
@@ -39,8 +38,6 @@ public class PartitionKey {
/**
* Returns partition ID.
- *
- * @return Partition ID.
*/
public int partitionId() {
return partitionId;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
index 49315d430d..b9b73c9eca 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
@@ -58,6 +59,9 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
/** Snapshot meta, constructed from the storage data and raft group configuration. */
private final SnapshotMeta snapshotMeta;
+ /** Incoming snapshots executor. */
+ private final Executor incomingSnapshotsExecutor;
+
/** Snapshot throttle instance. */
@Nullable
private SnapshotThrottle snapshotThrottle;
@@ -74,6 +78,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
* @param raftOptions RAFT options.
* @param partition Partition.
* @param snapshotMeta Snapshot meta.
+ * @param incomingSnapshotsExecutor Incoming snapshots executor.
*/
public PartitionSnapshotStorage(
TopologyService topologyService,
@@ -81,7 +86,8 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
String snapshotUri,
RaftOptions raftOptions,
PartitionAccess partition,
- SnapshotMeta snapshotMeta
+ SnapshotMeta snapshotMeta,
+ Executor incomingSnapshotsExecutor
) {
this.topologyService = topologyService;
this.outgoingSnapshotsManager = outgoingSnapshotsManager;
@@ -89,6 +95,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
this.raftOptions = raftOptions;
this.partition = partition;
this.snapshotMeta = snapshotMeta;
+ this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
}
/**
@@ -136,37 +143,39 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
/**
* Returns a snapshot throttle instance.
*/
- public SnapshotThrottle snapshotThrottle() {
+ public @Nullable SnapshotThrottle snapshotThrottle() {
return snapshotThrottle;
}
- /** {@inheritDoc} */
+ /**
+ * Returns the incoming snapshots executor.
+ */
+ public Executor getIncomingSnapshotsExecutor() {
+ return incomingSnapshotsExecutor;
+ }
+
@Override
public boolean init(Void opts) {
// No-op.
return true;
}
- /** {@inheritDoc} */
@Override
public void shutdown() {
// No-op.
}
- /** {@inheritDoc} */
@Override
public boolean setFilterBeforeCopyRemote() {
// Option is not supported.
return false;
}
- /** {@inheritDoc} */
@Override
public SnapshotWriter create() {
return new PartitionSnapshotWriter(this);
}
- /** {@inheritDoc} */
@Override
public SnapshotReader open() {
if (startupSnapshotOpened.compareAndSet(false, true)) {
@@ -176,13 +185,11 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
return new OutgoingSnapshotReader(this);
}
- /** {@inheritDoc} */
@Override
public SnapshotReader copyFrom(String uri, SnapshotCopierOptions opts) {
throw new UnsupportedOperationException("Synchronous snapshot copy is not supported.");
}
- /** {@inheritDoc} */
@Override
public SnapshotCopier startToCopyFrom(String uri, SnapshotCopierOptions opts) {
SnapshotUri snapshotUri = SnapshotUri.fromStringUri(uri);
@@ -194,7 +201,6 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
return copier;
}
- /** {@inheritDoc} */
@Override
public void setSnapshotThrottle(SnapshotThrottle snapshotThrottle) {
this.snapshotThrottle = snapshotThrottle;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 93a0fa9052..e0301bf1bf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
import java.util.List;
+import java.util.concurrent.Executor;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -31,7 +32,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
/**
* Snapshot storage factory for {@link MvPartitionStorage}. Utilizes the fact that every partition already stores its latest applied index
- * and thus can inself be used as its own snapshot.
+ * and thus can itself be used as its own snapshot.
*
* <p/>Uses {@link MvPartitionStorage#persistedIndex()} and configuration, passed into constructor, to create a {@link SnapshotMeta} object
* in {@link SnapshotReader#load()}.
@@ -55,9 +56,12 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
/** List of learners. */
private final List<String> learners;
- /** RAFT log index read from {@link PartitionAccess#persistedIndex()} during factory instantiation. */
+ /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()} during factory instantiation. */
private final long persistedRaftIndex;
+ /** Incoming snapshots executor. */
+ private final Executor incomingSnapshotsExecutor;
+
/**
* Constructor.
*
@@ -66,6 +70,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
* @param partition MV partition storage.
* @param peers List of raft group peers to be used in snapshot meta.
* @param learners List of raft group learners to be used in snapshot meta.
+ * @param incomingSnapshotsExecutor Incoming snapshots executor.
* @see SnapshotMeta
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@@ -74,18 +79,19 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
OutgoingSnapshotsManager outgoingSnapshotsManager,
PartitionAccess partition,
List<String> peers,
- List<String> learners
+ List<String> learners,
+ Executor incomingSnapshotsExecutor
) {
this.topologyService = topologyService;
this.outgoingSnapshotsManager = outgoingSnapshotsManager;
this.partition = partition;
this.peers = peers;
this.learners = learners;
+ this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
- persistedRaftIndex = partition.persistedIndex();
+ persistedRaftIndex = partition.mvPartitionStorage().persistedIndex();
}
- /** {@inheritDoc} */
@Override
public SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) {
SnapshotMeta snapshotMeta = new RaftMessagesFactory().snapshotMeta()
@@ -103,7 +109,8 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
uri,
raftOptions,
partition,
- snapshotMeta
+ snapshotMeta,
+ incomingSnapshotsExecutor
);
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java
index 341d96d4f6..ac49771516 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java
@@ -30,7 +30,7 @@ public class SnapshotUri {
* Creates a string representation of the snapshot URI.
*
* @param snapshotId Snapshot id.
- * @param nodeName Sender node name.
+ * @param nodeName Sender node (consistent id) name.
*/
public static String toStringUri(UUID snapshotId, String nodeName) {
return nodeName + "-" + snapshotId;
@@ -52,7 +52,7 @@ public class SnapshotUri {
/** Snapshot id. */
public final UUID snapshotId;
- /** Sender node name. */
+ /** Sender node (consistent id) name. */
public final String nodeName;
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 48d5504d31..da699b96a9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -17,94 +17,159 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
-import java.io.IOException;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.table.distributed.TableManager.FULL_RABALANCING_STARTED;
+
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.MessagingService;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
-import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.jetbrains.annotations.Nullable;
/**
* Snapshot copier implementation for partitions. Used to stream partition data from the leader to the local node.
*/
public class IncomingSnapshotCopier extends SnapshotCopier {
- /** Messages factory. */
+ private static final IgniteLogger LOG = Loggers.forClass(IncomingSnapshotCopier.class);
+
private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
- /** {@link SnapshotStorage} instance for the partition. */
- private final PartitionSnapshotStorage snapshotStorage;
+ private static final long NETWORK_TIMEOUT = Long.MAX_VALUE;
- /** Snapshot URI. */
- private final SnapshotUri snapshotUri;
+ private final PartitionSnapshotStorage partitionSnapshotStorage;
- /** Rebalance thread-pool, used to write data into a storage. */
- //TODO https://issues.apache.org/jira/browse/IGNITE-17262
- // Use external pool.
- private final ExecutorService threadPool = Executors.newSingleThreadExecutor();
+ private final SnapshotUri snapshotUri;
/**
* Snapshot meta read from the leader.
*
* @see SnapshotMetaRequest
*/
- private SnapshotMeta snapshotMeta;
+ @Nullable
+ private volatile SnapshotMeta snapshotMeta;
+
+ private volatile boolean canceled;
+
+ @Nullable
+ private volatile CompletableFuture<?> future;
/**
* Constructor.
*
- * @param snapshotStorage Snapshot storage.
+ * @param partitionSnapshotStorage Snapshot storage.
* @param snapshotUri Snapshot URI.
*/
- public IncomingSnapshotCopier(PartitionSnapshotStorage snapshotStorage, SnapshotUri snapshotUri) {
- this.snapshotStorage = snapshotStorage;
+ public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage, SnapshotUri snapshotUri) {
+ this.partitionSnapshotStorage = partitionSnapshotStorage;
this.snapshotUri = snapshotUri;
}
@Override
- public void cancel() {
- //TODO https://issues.apache.org/jira/browse/IGNITE-17262
- // Implement.
+ public void start() {
+ Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
+
+ LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+
+ future = prepareMvPartitionStorageForRebalance(executor)
+ .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+ .thenCompose(unused -> {
+ ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
+
+ if (snapshotSender == null) {
+ LOG.error(
+ "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
+ partId(),
+ tableId(),
+ snapshotUri.nodeName
+ );
+
+ if (!isOk()) {
+ setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
+ }
+
+ return completedFuture(null);
+ }
+
+ return loadSnapshotMeta(snapshotSender)
+ .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
+ .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
+ .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+ });
}
@Override
public void join() throws InterruptedException {
- //TODO https://issues.apache.org/jira/browse/IGNITE-17262
- // Implement proper join.
+ CompletableFuture<?> fut = future;
+
+ if (fut != null) {
+ try {
+ fut.get();
+
+ if (canceled && !isOk()) {
+ setError(RaftError.ECANCELED, "Copier is cancelled");
+ }
+ } catch (CancellationException e) {
+ // Ignored.
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+
+ LOG.error("Error when completing the copier", cause);
+
+ if (!isOk()) {
+ setError(RaftError.UNKNOWN, "Unknown error on completion the copier");
+ }
+
+ // By analogy with LocalSnapshotCopier#join.
+ throw new IllegalStateException(cause);
+ }
+ }
}
@Override
- public void start() {
- //TODO https://issues.apache.org/jira/browse/IGNITE-17262
- // What if node can't be resolved?
- ClusterNode sourceNode = snapshotStorage.topologyService().getByConsistentId(snapshotUri.nodeName);
-
- MessagingService messagingService = snapshotStorage.outgoingSnapshotsManager().messagingService();
-
- threadPool.submit(() -> {
- //TODO https://issues.apache.org/jira/browse/IGNITE-17262
- // Following code is just an example of what I expect and shouldn't be considered a template.
- CompletableFuture<NetworkMessage> metaRequestFuture = messagingService.invoke(
- sourceNode,
- MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
- 1000L
- );
-
- metaRequestFuture.whenComplete((networkMessage, throwable) -> {
- SnapshotMetaResponse metaResponse = (SnapshotMetaResponse) networkMessage;
-
- snapshotMeta = metaResponse.meta();
- });
- });
+ public void cancel() {
+ canceled = true;
+
+ LOG.info("Copier is canceled for partition [partId={}, tableId={}]", partId(), tableId());
+
+ CompletableFuture<?> fut = future;
+
+ if (fut != null) {
+ try {
+ // Because after the cancellation, no one waits for #join.
+ join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // No-op.
}
@Override
@@ -113,8 +178,236 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
return new IncomingSnapshotReader(snapshotMeta);
}
- @Override
- public void close() throws IOException {
- threadPool.shutdownNow();
+ /**
+ * Prepares the {@link MvPartitionStorage} for a full rebalance.
+ *
+ * <p>Recreates {@link MvPartitionStorage} and sets the last applied index to {@link TableManager#FULL_RABALANCING_STARTED} so that
+ * when the node is restarted, we can understand that the full rebalance has not completed, and we need to clean up the storage from
+ * garbage.
+ */
+ private CompletableFuture<?> prepareMvPartitionStorageForRebalance(Executor executor) {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ return CompletableFuture.supplyAsync(() -> partitionSnapshotStorage.partition().reCreateMvPartitionStorage(), executor)
+ .thenCompose(mvPartitionStorage -> {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ mvPartitionStorage.runConsistently(() -> {
+ mvPartitionStorage.lastAppliedIndex(FULL_RABALANCING_STARTED);
+
+ return null;
+ });
+
+ LOG.info("Copier prepared multi-versioned storage for the partition [partId={}, tableId={}]", partId());
+
+ return completedFuture(null);
+ });
+ }
+
+ /**
+ * Prepares the {@link TxStateStorage} for a full rebalance.
+ *
+ * <p>Recreates {@link TxStateStorage} and sets the last applied index to {@link TableManager#FULL_RABALANCING_STARTED} so that when
+ * the node is restarted, we can understand that the full rebalance has not completed, and we need to clean up the storage from
+ * garbage.
+ */
+ private CompletableFuture<?> prepareTxStatePartitionStorageForRebalance(Executor executor) {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ return CompletableFuture.supplyAsync(() -> partitionSnapshotStorage.partition().reCreateTxStatePartitionStorage(), executor)
+ .thenCompose(txStatePartitionStorage -> {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ txStatePartitionStorage.lastAppliedIndex(FULL_RABALANCING_STARTED);
+
+ LOG.info("Copier prepared transaction state storage for the partition [partId={}, tableId={}]", partId());
+
+ return completedFuture(null);
+ });
+ }
+
+ private @Nullable ClusterNode getSnapshotSender(String nodeName) {
+ return partitionSnapshotStorage.topologyService().getByConsistentId(nodeName);
+ }
+
+ /**
+ * Requests and saves the snapshot meta in {@link #snapshotMeta}.
+ */
+ private CompletableFuture<?> loadSnapshotMeta(ClusterNode snapshotSender) {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+ snapshotSender,
+ MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
+ NETWORK_TIMEOUT
+ ).thenAccept(response -> {
+ snapshotMeta = ((SnapshotMetaResponse) response).meta();
+
+ LOG.info("Copier has loaded the snapshot meta for the partition [partId={}, tableId={}]", partId(), tableId());
+ });
+ }
+
+ /**
+ * Requests and stores data into {@link MvPartitionStorage}.
+ */
+ private CompletableFuture<?> loadSnapshotMvData(ClusterNode snapshotSender, Executor executor) {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+ snapshotSender,
+ MSG_FACTORY.snapshotMvDataRequest().id(snapshotUri.snapshotId).build(),
+ NETWORK_TIMEOUT
+ ).thenComposeAsync(response -> {
+ SnapshotMvDataResponse snapshotMvDataResponse = ((SnapshotMvDataResponse) response);
+
+ MvPartitionStorage mvPartition = partitionSnapshotStorage.partition().mvPartitionStorage();
+
+ for (ResponseEntry entry : snapshotMvDataResponse.rows()) {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ // Let's write all versions for the row ID.
+ mvPartition.runConsistently(() -> {
+ RowId rowId = new RowId(partId(), entry.rowId());
+
+ for (int i = 0; i < entry.rowVersions().size(); i++) {
+ HybridTimestamp timestamp = i < entry.timestamps().size() ? entry.timestamps().get(i) : null;
+
+ BinaryRow binaryRow = new ByteBufferRow(entry.rowVersions().get(i).rewind());
+
+ if (timestamp == null) {
+ // Writes an intent to write (uncommitted version).
+ assert entry.txId() != null;
+ assert entry.commitTableId() != null;
+ assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+ mvPartition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
+ } else {
+ // Writes committed version.
+ mvPartition.addWriteCommitted(rowId, binaryRow, timestamp);
+ }
+ }
+
+ return null;
+ });
+ }
+
+ if (snapshotMvDataResponse.finish()) {
+ LOG.info(
+ "Copier has finished loading multi-versioned data [partId={}, rows={}]",
+ partId(),
+ snapshotMvDataResponse.rows().size()
+ );
+
+ return completedFuture(null);
+ } else {
+ LOG.info(
+ "Copier has loaded a portion of multi-versioned data [partId={}, rows={}]",
+ partId(),
+ snapshotMvDataResponse.rows().size()
+ );
+
+ // Let's upload the rest.
+ return loadSnapshotMvData(snapshotSender, executor);
+ }
+ }, executor);
+ }
+
+ /**
+ * Requests and stores data into {@link TxStateStorage}.
+ */
+ private CompletableFuture<?> loadSnapshotTxData(ClusterNode snapshotSender, Executor executor) {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+ snapshotSender,
+ MSG_FACTORY.snapshotTxDataRequest().id(snapshotUri.snapshotId).build(),
+ NETWORK_TIMEOUT
+ ).thenComposeAsync(response -> {
+ SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) response;
+
+ TxStateStorage txStatePartitionStorage = partitionSnapshotStorage.partition().txStatePartitionStorage();
+
+ assert snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size();
+
+ for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
+ if (canceled) {
+ return completedFuture(null);
+ }
+
+ txStatePartitionStorage.put(snapshotTxDataResponse.txIds().get(i), snapshotTxDataResponse.txMeta().get(i));
+ }
+
+ if (snapshotTxDataResponse.finish()) {
+ LOG.info(
+ "Copier has finished loading transaction meta [partId={}, metas={}]",
+ partId(),
+ snapshotTxDataResponse.txMeta().size()
+ );
+
+ return completedFuture(null);
+ } else {
+ LOG.info(
+ "Copier has loaded a portion of transaction meta [partId={}, metas={}]",
+ partId(),
+ snapshotTxDataResponse.txMeta().size()
+ );
+
+ // Let's upload the rest.
+ return loadSnapshotTxData(snapshotSender, executor);
+ }
+ }, executor);
+ }
+
+ /**
+ * Updates the last applied index for {@link MvPartitionStorage} and {@link TxStateStorage} from the {@link #snapshotMeta}.
+ */
+ private void updateLastAppliedIndexFromSnapshotMetaForStorages() {
+ if (canceled) {
+ return;
+ }
+
+ TxStateStorage txStatePartitionStorage = partitionSnapshotStorage.partition().txStatePartitionStorage();
+ MvPartitionStorage mvPartitionStorage = partitionSnapshotStorage.partition().mvPartitionStorage();
+
+ SnapshotMeta meta = snapshotMeta;
+
+ assert meta != null;
+
+ mvPartitionStorage.runConsistently(() -> {
+ txStatePartitionStorage.lastAppliedIndex(meta.lastIncludedIndex());
+ mvPartitionStorage.lastAppliedIndex(meta.lastIncludedIndex());
+
+ return null;
+ });
+
+ LOG.info(
+ "Copier has finished updating last applied index for the partition [partId={}, lastAppliedIndex={}]",
+ partId(),
+ meta.lastIncludedIndex()
+ );
+ }
+
+ private int partId() {
+ return partitionSnapshotStorage.partition().partitionKey().partitionId();
+ }
+
+ private UUID tableId() {
+ return partitionSnapshotStorage.partition().partitionKey().tableId();
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index edb233402a..1a6ff44f96 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+import static java.util.stream.Collectors.toList;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -103,7 +105,7 @@ public class OutgoingSnapshot {
this.partition = partition;
this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
- lastRowId = RowId.lowestRowId(partition.key().partitionId());
+ lastRowId = RowId.lowestRowId(partition.partitionKey().partitionId());
}
/**
@@ -119,7 +121,7 @@ public class OutgoingSnapshot {
* @return Partition key.
*/
public PartitionKey partitionKey() {
- return partition.key();
+ return partition.partitionKey();
}
/**
@@ -221,11 +223,11 @@ public class OutgoingSnapshot {
}
if (!startedToReadPartition) {
- lastRowId = partition.closestRowId(lastRowId);
+ lastRowId = partition.mvPartitionStorage().closestRowId(lastRowId);
startedToReadPartition = true;
} else {
- lastRowId = partition.closestRowId(lastRowId.increment());
+ lastRowId = partition.mvPartitionStorage().closestRowId(lastRowId.increment());
}
if (!exhaustedPartition()) {
@@ -250,7 +252,7 @@ public class OutgoingSnapshot {
}
private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId) {
- List<ReadResult> rowVersionsN2O = partition.rowVersions(rowId);
+ List<ReadResult> rowVersionsN2O = partition.mvPartitionStorage().scanVersions(rowId).stream().collect(toList());
List<ByteBuffer> buffers = new ArrayList<>(rowVersionsN2O.size());
List<HybridTimestamp> commitTimestamps = new ArrayList<>(rowVersionsN2O.size());
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index b3fa50a288..4557970b9b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -50,7 +50,7 @@ public class OutgoingSnapshotReader extends SnapshotReader {
//TODO https://issues.apache.org/jira/browse/IGNITE-17935
// This meta is wrong, we need a right one.
snapshotMeta = new RaftMessagesFactory().snapshotMeta()
- .lastIncludedIndex(snapshotStorage.partition().persistedIndex())
+ .lastIncludedIndex(snapshotStorage.partition().mvPartitionStorage().persistedIndex())
.lastIncludedTerm(snapshotStorage.startupSnapshotMeta().lastIncludedTerm())
.peersList(snapshotStorage.startupSnapshotMeta().peersList())
.learnersList(snapshotStorage.startupSnapshotMeta().learnersList())
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
deleted file mode 100644
index 247ac674ec..0000000000
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.raft.snapshot;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.ReadResult;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.util.Cursor;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-class PartitionAccessImplTest {
- @Mock
- private MvPartitionStorage partitionStorage;
-
- private PartitionAccessImpl access;
-
- private final PartitionKey key = new PartitionKey(UUID.randomUUID(), 1);
-
- @BeforeEach
- void createTestInstance() {
- access = new PartitionAccessImpl(key, partitionStorage);
- }
-
- @Test
- void returnsProvidedKey() {
- assertThat(access.key(), is(key));
- }
-
- @Test
- void persistedIndexDelegatesToStorage() {
- when(partitionStorage.persistedIndex()).thenReturn(42L);
-
- assertThat(access.persistedIndex(), is(42L));
- }
-
- @Test
- void minRowIdDelegatesToStorage() {
- RowId argRowId = new RowId(1);
- RowId resultRowId = new RowId(1);
-
- when(partitionStorage.closestRowId(any())).thenReturn(resultRowId);
-
- assertThat(access.closestRowId(argRowId), is(resultRowId));
- }
-
- @Test
- void returnsRowVersionsFromStorage() {
- ReadResult result1 = mock(ReadResult.class);
- ReadResult result2 = mock(ReadResult.class);
-
- when(partitionStorage.scanVersions(any()))
- .thenReturn(Cursor.fromIterator(List.of(result1, result2).iterator()));
-
- List<ReadResult> versions = access.rowVersions(new RowId(1));
- assertThat(versions, is(equalTo(List.of(result1, result2))));
- }
-}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
new file mode 100644
index 0000000000..7c82c79035
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateTableStorage;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link IncomingSnapshotCopier} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class IncomingSnapshotCopierTest {
+ private static final String NODE_NAME = "node";
+
+ private static final int TEST_PARTITION = 0;
+
+ private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("key", NativeTypes.stringOf(256), false)},
+ new Column[]{new Column("value", NativeTypes.stringOf(256), false)}
+ );
+
+ private static final HybridClock HYBRID_CLOCK = new HybridClock();
+
+ private static final TableMessagesFactory TABLE_MSG_FACTORY = new TableMessagesFactory();
+
+ private static final RaftMessagesFactory RAFT_MSG_FACTORY = new RaftMessagesFactory();
+
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ @InjectConfiguration(value = "mock.tables.foo {}")
+ private TablesConfiguration tablesConfig;
+
+ @AfterEach
+ void tearDown() {
+ shutdownAndAwaitTermination(executorService, 1, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void test() throws Exception {
+ MvPartitionStorage outgoingMvPartitionStorage = new TestMvPartitionStorage(TEST_PARTITION);
+ TxStateStorage outgoingTxStatePartitionStorage = new TestConcurrentHashMapTxStateStorage();
+
+ long expLastAppliedIndex = 100500L;
+
+ List<RowId> rowIds = fillMvPartitionStorage(outgoingMvPartitionStorage);
+ List<UUID> txIds = fillTxStatePartitionStorage(outgoingTxStatePartitionStorage);
+
+ outgoingMvPartitionStorage.lastAppliedIndex(expLastAppliedIndex);
+ outgoingTxStatePartitionStorage.lastAppliedIndex(expLastAppliedIndex);
+
+ UUID snapshotId = UUID.randomUUID();
+
+ MvTableStorage incomingMvTableStorage = spy(new TestMvTableStorage(tablesConfig.tables().get("foo"), tablesConfig));
+ TxStateTableStorage incomingTxStateTableStorage = spy(new TestConcurrentHashMapTxStateTableStorage());
+
+ incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
+ incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+
+ PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage(
+ snapshotId,
+ expLastAppliedIndex,
+ incomingMvTableStorage,
+ incomingTxStateTableStorage,
+ outgoingMvPartitionStorage,
+ outgoingTxStatePartitionStorage,
+ rowIds,
+ txIds
+ );
+
+ SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom(
+ SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+ mock(SnapshotCopierOptions.class)
+ );
+
+ runAsync(snapshotCopier::join).get(1, TimeUnit.SECONDS);
+
+ assertEquals(Status.OK().getCode(), snapshotCopier.getCode());
+
+ MvPartitionStorage incomingMvPartitionStorage = incomingMvTableStorage.getMvPartition(TEST_PARTITION);
+ TxStateStorage incomingTxStatePartitionStorage = incomingTxStateTableStorage.getTxStateStorage(TEST_PARTITION);
+
+ assertEquals(outgoingMvPartitionStorage.lastAppliedIndex(), expLastAppliedIndex);
+ assertEquals(outgoingTxStatePartitionStorage.lastAppliedIndex(), expLastAppliedIndex);
+
+ assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
+ assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
+
+ verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
+ verify(incomingMvTableStorage, times(2)).getOrCreateMvPartition(eq(TEST_PARTITION));
+
+ verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
+ verify(incomingTxStateTableStorage, times(2)).getOrCreateTxStateStorage(eq(TEST_PARTITION));
+ }
+
+ private PartitionSnapshotStorage createPartitionSnapshotStorage(
+ UUID snapshotId,
+ long lastAppliedIndexForSnapshotMeta,
+ MvTableStorage incomingTableStorage,
+ TxStateTableStorage incomingTxStateTableStorage,
+ MvPartitionStorage outgoingMvPartitionStorage,
+ TxStateStorage outgoingTxStatePartitionStorage,
+ List<RowId> rowIds,
+ List<UUID> txIds
+ ) {
+ TopologyService topologyService = mock(TopologyService.class);
+
+ ClusterNode clusterNode = mock(ClusterNode.class);
+
+ when(topologyService.getByConsistentId(NODE_NAME)).thenReturn(clusterNode);
+
+ OutgoingSnapshotsManager outgoingSnapshotsManager = mock(OutgoingSnapshotsManager.class);
+
+ MessagingService messagingService = mock(MessagingService.class);
+
+ when(messagingService.invoke(eq(clusterNode), any(SnapshotMetaRequest.class), anyLong())).then(answer -> {
+ SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1);
+
+ assertEquals(snapshotId, snapshotMetaRequest.id());
+
+ return completedFuture(
+ TABLE_MSG_FACTORY.snapshotMetaResponse()
+ .meta(RAFT_MSG_FACTORY.snapshotMeta().lastIncludedIndex(lastAppliedIndexForSnapshotMeta).build())
+ .build()
+ );
+ });
+
+ when(messagingService.invoke(eq(clusterNode), any(SnapshotMvDataRequest.class), anyLong())).then(answer -> {
+ SnapshotMvDataRequest snapshotMvDataRequest = answer.getArgument(1);
+
+ assertEquals(snapshotId, snapshotMvDataRequest.id());
+
+ List<ResponseEntry> responseEntries = createSnapshotMvDataEntries(outgoingMvPartitionStorage, rowIds);
+
+ assertThat(responseEntries, not(empty()));
+
+ return completedFuture(TABLE_MSG_FACTORY.snapshotMvDataResponse().rows(responseEntries).finish(true).build());
+ });
+
+ when(messagingService.invoke(eq(clusterNode), any(SnapshotTxDataRequest.class), anyLong())).then(answer -> {
+ SnapshotTxDataRequest snapshotTxDataRequest = answer.getArgument(1);
+
+ assertEquals(snapshotId, snapshotTxDataRequest.id());
+
+ List<TxMeta> txMetas = txIds.stream().map(outgoingTxStatePartitionStorage::get).collect(toList());
+
+ return completedFuture(TABLE_MSG_FACTORY.snapshotTxDataResponse().txIds(txIds).txMeta(txMetas).finish(true).build());
+ });
+
+ when(outgoingSnapshotsManager.messagingService()).thenReturn(messagingService);
+
+ return new PartitionSnapshotStorage(
+ topologyService,
+ outgoingSnapshotsManager,
+ SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+ mock(RaftOptions.class),
+ new PartitionAccessImpl(
+ new PartitionKey(UUID.randomUUID(), TEST_PARTITION),
+ incomingTableStorage,
+ incomingTxStateTableStorage
+ ),
+ mock(SnapshotMeta.class),
+ executorService
+ );
+ }
+
+ private static List<RowId> fillMvPartitionStorage(MvPartitionStorage storage) {
+ List<RowId> rowIds = List.of(
+ new RowId(TEST_PARTITION),
+ new RowId(TEST_PARTITION),
+ new RowId(TEST_PARTITION),
+ new RowId(TEST_PARTITION)
+ );
+
+ storage.runConsistently(() -> {
+ // Writes committed version.
+ storage.addWriteCommitted(rowIds.get(0), createBinaryRow("k0", "v0"), HYBRID_CLOCK.now());
+ storage.addWriteCommitted(rowIds.get(1), createBinaryRow("k1", "v1"), HYBRID_CLOCK.now());
+
+ storage.addWriteCommitted(rowIds.get(2), createBinaryRow("k20", "v20"), HYBRID_CLOCK.now());
+ storage.addWriteCommitted(rowIds.get(2), createBinaryRow("k21", "v21"), HYBRID_CLOCK.now());
+
+ // Writes an intent to write (uncommitted version).
+ storage.addWrite(rowIds.get(2), createBinaryRow("k22", "v22"), UUID.randomUUID(), UUID.randomUUID(), TEST_PARTITION);
+
+ storage.addWrite(
+ rowIds.get(3),
+ createBinaryRow("k3", "v3"),
+ UUID.randomUUID(),
+ UUID.randomUUID(),
+ TEST_PARTITION
+ );
+
+ return null;
+ });
+
+ return rowIds;
+ }
+
+ private static List<UUID> fillTxStatePartitionStorage(TxStateStorage storage) {
+ List<UUID> txIds = List.of(
+ UUID.randomUUID(),
+ UUID.randomUUID(),
+ UUID.randomUUID(),
+ UUID.randomUUID()
+ );
+
+ UUID tableId = UUID.randomUUID();
+
+ storage.put(txIds.get(0), new TxMeta(COMMITED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+ storage.put(txIds.get(1), new TxMeta(COMMITED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+ storage.put(txIds.get(2), new TxMeta(ABORTED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+ storage.put(txIds.get(3), new TxMeta(ABORTED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+
+ return txIds;
+ }
+
+ private static List<ResponseEntry> createSnapshotMvDataEntries(MvPartitionStorage storage, List<RowId> rowIds) {
+ List<ResponseEntry> responseEntries = new ArrayList<>();
+
+ for (RowId rowId : rowIds) {
+ List<ReadResult> readResults = storage.scanVersions(rowId).stream().collect(toList());
+
+ Collections.reverse(readResults);
+
+ List<ByteBuffer> rowVersions = new ArrayList<>();
+ List<HybridTimestamp> timestamps = new ArrayList<>();
+
+ UUID txId = null;
+ UUID commitTableId = null;
+ int commitPartitionId = ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+ for (ReadResult readResult : readResults) {
+ rowVersions.add(readResult.binaryRow().byteBuffer());
+
+ if (readResult.isWriteIntent()) {
+ txId = readResult.transactionId();
+ commitTableId = readResult.commitTableId();
+ commitPartitionId = readResult.commitPartitionId();
+ } else {
+ timestamps.add(readResult.commitTimestamp());
+ }
+ }
+
+ responseEntries.add(
+ TABLE_MSG_FACTORY.responseEntry()
+ .rowId(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits()))
+ .rowVersions(rowVersions)
+ .timestamps(timestamps)
+ .txId(txId)
+ .commitTableId(commitTableId)
+ .commitPartitionId(commitPartitionId)
+ .build()
+ );
+ }
+
+ return responseEntries;
+ }
+
+ private static BinaryRow createBinaryRow(String key, String value) {
+ return new RowAssembler(SCHEMA_DESCRIPTOR, 1, 1).appendString(key).appendString(value).build();
+ }
+
+ private static void assertEqualsMvRows(MvPartitionStorage expected, MvPartitionStorage actual, List<RowId> rowIds) {
+ for (RowId rowId : rowIds) {
+ List<ReadResult> expReadResults = expected.scanVersions(rowId).stream().collect(toList());
+ List<ReadResult> actReadResults = actual.scanVersions(rowId).stream().collect(toList());
+
+ assertEquals(expReadResults.size(), actReadResults.size(), rowId.toString());
+
+ for (int i = 0; i < expReadResults.size(); i++) {
+ ReadResult expReadResult = expReadResults.get(i);
+ ReadResult actReadResult = actReadResults.get(i);
+
+ String msg = "RowId=" + rowId + ", i=" + i;
+
+ Row expRow = new Row(SCHEMA_DESCRIPTOR, expReadResult.binaryRow());
+ Row actRow = new Row(SCHEMA_DESCRIPTOR, actReadResult.binaryRow());
+
+ assertEquals(expRow.stringValue(0), actRow.stringValue(0), msg);
+ assertEquals(expRow.stringValue(1), actRow.stringValue(1), msg);
+
+ assertEquals(expReadResult.commitTimestamp(), actReadResult.commitTimestamp(), msg);
+ assertEquals(expReadResult.transactionId(), actReadResult.transactionId(), msg);
+ assertEquals(expReadResult.commitTableId(), actReadResult.commitTableId(), msg);
+ assertEquals(expReadResult.commitPartitionId(), actReadResult.commitPartitionId(), msg);
+ assertEquals(expReadResult.isWriteIntent(), actReadResult.isWriteIntent(), msg);
+ }
+ }
+ }
+
+ private static void assertEqualsTxStates(TxStateStorage expected, TxStateStorage actual, List<UUID> txIds) {
+ for (UUID txId : txIds) {
+ assertEquals(expected.get(txId), actual.get(txId));
+ }
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
index 22b6001e63..e5af7246d5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAcces
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
+import org.apache.ignite.internal.util.Cursor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -55,6 +57,9 @@ class OutgoingSnapshotTest {
@Mock
private PartitionAccess partitionAccess;
+ @Mock
+ private MvPartitionStorage mvPartitionStorage;
+
@Mock
private OutgoingSnapshotRegistry snapshotRegistry;
@@ -78,7 +83,9 @@ class OutgoingSnapshotTest {
@BeforeEach
void createTestInstance() {
- lenient().when(partitionAccess.key()).thenReturn(partitionKey);
+ lenient().when(partitionAccess.partitionKey()).thenReturn(partitionKey);
+
+ lenient().when(partitionAccess.mvPartitionStorage()).thenReturn(mvPartitionStorage);
snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, snapshotRegistry);
}
@@ -130,9 +137,9 @@ class OutgoingSnapshotTest {
}
private void configureStorageToHaveExactlyOneRowWith(List<ReadResult> versions) {
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.rowVersions(rowId1)).thenReturn(versions);
- lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(null);
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(versions));
+ lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(null);
}
private SnapshotMvDataResponse getMvDataResponse(long batchSizeHint) throws InterruptedException, ExecutionException, TimeoutException {
@@ -167,11 +174,11 @@ class OutgoingSnapshotTest {
ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1));
- when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
- when(partitionAccess.rowVersions(rowId2)).thenReturn(List.of(version2));
- when(partitionAccess.closestRowId(rowId3)).thenReturn(null);
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
+ when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
+ when(mvPartitionStorage.scanVersions(rowId2)).thenReturn(Cursor.fromIterable(List.of(version2)));
+ when(mvPartitionStorage.closestRowId(rowId3)).thenReturn(null);
SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
@@ -187,8 +194,8 @@ class OutgoingSnapshotTest {
void rowsWithIdsToSkipAreIgnored() throws Exception {
snapshot.addRowIdToSkip(rowId1);
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.closestRowId(rowId2)).thenReturn(null);
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(null);
SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
@@ -206,7 +213,7 @@ class OutgoingSnapshotTest {
clock.now()
);
- when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version2, version1));
+ when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version2, version1)));
snapshot.enqueueForSending(rowIdOutOfOrder);
@@ -230,7 +237,7 @@ class OutgoingSnapshotTest {
}
private void configureStorageToBeEmpty() {
- lenient().when(partitionAccess.closestRowId(lowestRowId)).thenReturn(null);
+ lenient().when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(null);
}
@Test
@@ -238,7 +245,7 @@ class OutgoingSnapshotTest {
ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
- when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1));
+ when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version1)));
snapshot.enqueueForSending(rowIdOutOfOrder);
@@ -257,11 +264,11 @@ class OutgoingSnapshotTest {
ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
- when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version2));
+ when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version2)));
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1));
- when(partitionAccess.closestRowId(rowId2)).then(invocation -> {
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
+ when(mvPartitionStorage.closestRowId(rowId2)).then(invocation -> {
snapshot.enqueueForSending(rowIdOutOfOrder);
return null;
});
@@ -333,8 +340,8 @@ class OutgoingSnapshotTest {
void mvDataHandlingRespectsBatchSizeHintForMessagesFromPartition() throws Exception {
ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version));
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version)));
SnapshotMvDataResponse response = getMvDataResponse(1);
@@ -345,7 +352,7 @@ class OutgoingSnapshotTest {
void mvDataHandlingRespectsBatchSizeHintForOutOfOrderMessages() throws Exception {
ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version));
+ when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version)));
snapshot.enqueueForSending(rowIdOutOfOrder);
@@ -361,9 +368,9 @@ class OutgoingSnapshotTest {
ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1, version2));
- lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
+ lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
SnapshotMvDataResponse response = getMvDataResponse(1);
@@ -387,7 +394,7 @@ class OutgoingSnapshotTest {
void sendsRowsFromOutOfOrderQueueBiggerThanHint() throws Exception {
ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[1000]), clock.now());
- when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version));
+ when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version)));
snapshot.enqueueForSending(rowIdOutOfOrder);
@@ -410,9 +417,9 @@ class OutgoingSnapshotTest {
ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1, version2));
- lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
+ lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
getMvDataResponse(1);
@@ -424,9 +431,9 @@ class OutgoingSnapshotTest {
ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
- when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
- when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1, version2));
- lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+ when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+ when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
+ lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
getMvDataResponse(1);
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index 88fe42149c..3142730989 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -94,6 +94,11 @@ public interface TxStateStorage extends AutoCloseable {
*/
long lastAppliedIndex();
+ /**
+ * Sets the last applied index value.
+ */
+ void lastAppliedIndex(long lastAppliedIndex);
+
/**
* {@link #lastAppliedIndex()} value consistent with the data, already persisted on the storage.
*/
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java
index c241328dd0..0e2bfb208a 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.tx.storage.state;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.configuration.storage.StorageException;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.jetbrains.annotations.Nullable;
@@ -48,10 +47,9 @@ public interface TxStateTableStorage extends AutoCloseable {
* Destroy transaction state storage.
*
* @param partitionId Partition id.
- * @return Future.
* @throws StorageException In case when the operation has failed.
*/
- CompletableFuture<Void> destroyTxStateStorage(int partitionId) throws StorageException;
+ void destroyTxStateStorage(int partitionId) throws StorageException;
/**
* Table configuration.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index f130366fd7..dfc5fc0813 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -122,7 +122,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
persistedIndex = lastAppliedIndex;
}
- /** {@inheritDoc} */
@Override
public TxMeta get(UUID txId) {
if (!busyLock.enterBusy()) {
@@ -145,7 +144,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
}
- /** {@inheritDoc} */
@Override
public void put(UUID txId, TxMeta txMeta) {
if (!busyLock.enterBusy()) {
@@ -166,7 +164,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
}
- /** {@inheritDoc} */
@Override
public boolean compareAndSet(UUID txId, TxState txStateExpected, TxMeta txMeta, long commandIndex) {
requireNonNull(txMeta);
@@ -221,7 +218,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
}
- /** {@inheritDoc} */
@Override
public void remove(UUID txId) {
if (!busyLock.enterBusy()) {
@@ -242,7 +238,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
}
- /** {@inheritDoc} */
@Override
public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
if (!busyLock.enterBusy()) {
@@ -297,19 +292,36 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> flush() {
return tableStorage.awaitFlush(true);
}
- /** {@inheritDoc} */
@Override
public long lastAppliedIndex() {
return lastAppliedIndex;
}
- /** {@inheritDoc} */
+ @Override
+ public void lastAppliedIndex(long lastAppliedIndex) {
+ if (!busyLock.enterBusy()) {
+ throwStorageStoppedException();
+ }
+
+ try {
+ db.put(lastAppliedIndexKey, longToBytes(lastAppliedIndex));
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ "Failed to write applied index value to transaction state storage, partition " + partitionId
+ + " of table " + tableStorage.configuration().value().name(),
+ e
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
@Override
public long persistedIndex() {
return persistedIndex;
@@ -350,7 +362,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
return appliedIndexBytes == null ? 0 : bytesToLong(appliedIndexBytes);
}
- /** {@inheritDoc} */
@Override
public void destroy() {
try (WriteBatch writeBatch = new WriteBatch()) {
@@ -396,7 +407,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
return new UUID(msb, lsb);
}
- /** {@inheritDoc} */
@Override
public void close() throws Exception {
if (!closeGuard.compareAndSet(false, true)) {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java
index ce4950c0d8..5cdbbba1e1 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static java.util.Collections.reverse;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.rocksdb.ReadTier.PERSISTED_TIER;
@@ -33,8 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.IntSupplier;
import org.apache.ignite.internal.configuration.storage.StorageException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.tostring.S;
@@ -56,9 +53,6 @@ import org.rocksdb.WriteOptions;
* RocksDb implementation of {@link TxStateTableStorage}.
*/
public class TxStateRocksDbTableStorage implements TxStateTableStorage {
- /** Logger. */
- private static final IgniteLogger LOG = Loggers.forClass(TxStateRocksDbTableStorage.class);
-
static {
RocksDB.loadLibrary();
}
@@ -144,8 +138,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
}
}
- /** {@inheritDoc} */
- @Override public TxStateStorage getOrCreateTxStateStorage(int partitionId) throws StorageException {
+ @Override
+ public TxStateStorage getOrCreateTxStateStorage(int partitionId) throws StorageException {
checkPartitionId(partitionId);
TxStateRocksDbStorage storage = storages.get(partitionId);
@@ -166,40 +160,36 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
return storage;
}
- /** {@inheritDoc} */
- @Override public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
+ @Override
+ public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
return storages.get(partitionId);
}
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> destroyTxStateStorage(int partitionId) throws StorageException {
+ @Override
+ public void destroyTxStateStorage(int partitionId) throws StorageException {
checkPartitionId(partitionId);
TxStateStorage storage = storages.getAndSet(partitionId, null);
- if (storage == null) {
- return completedFuture(null);
- }
+ if (storage != null) {
+ storage.destroy();
- storage.destroy();
-
- return awaitFlush(false).whenComplete((v, e) -> {
try {
storage.close();
- } catch (Exception ex) {
- LOG.error("Couldn't close the transaction state storage of partition "
+ } catch (Exception e) {
+ throw new StorageException("Couldn't close the transaction state storage of partition "
+ partitionId + ", table " + tableCfg.value().name());
}
- });
+ }
}
- /** {@inheritDoc} */
- @Override public TableConfiguration configuration() {
+ @Override
+ public TableConfiguration configuration() {
return tableCfg;
}
- /** {@inheritDoc} */
- @Override public void start() throws StorageException {
+ @Override
+ public void start() throws StorageException {
try {
flusher = new RocksDbFlusher(
busyLock,
@@ -239,8 +229,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
}
}
- /** {@inheritDoc} */
- @Override public void stop() throws StorageException {
+ @Override
+ public void stop() throws StorageException {
if (!stopGuard.compareAndSet(false, true)) {
return;
}
@@ -271,8 +261,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
}
}
- /** {@inheritDoc} */
- @Override public void destroy() throws StorageException {
+ @Override
+ public void destroy() throws StorageException {
try (Options options = new Options()) {
close();
@@ -305,8 +295,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
}
}
- /** {@inheritDoc} */
- @Override public void close() throws Exception {
+ @Override
+ public void close() throws Exception {
stop();
}
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java
index 1909fd142d..db4a082b35 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java
@@ -37,19 +37,18 @@ public class TestConcurrentHashMapTxStateStorage implements TxStateStorage {
/** Storage. */
private final ConcurrentHashMap<UUID, TxMeta> storage = new ConcurrentHashMap<>();
- /** {@inheritDoc} */
+ private volatile long lastAppliedIndex;
+
@Override
public TxMeta get(UUID txId) {
return storage.get(txId);
}
- /** {@inheritDoc} */
@Override
public void put(UUID txId, TxMeta txMeta) {
storage.put(txId, txMeta);
}
- /** {@inheritDoc} */
@Override
public boolean compareAndSet(UUID txId, TxState txStateExpected, @NotNull TxMeta txMeta, long commandIndex) {
while (true) {
@@ -76,19 +75,16 @@ public class TestConcurrentHashMapTxStateStorage implements TxStateStorage {
}
}
- /** {@inheritDoc} */
@Override
public void remove(UUID txId) {
storage.remove(txId);
}
- /** {@inheritDoc} */
@Override
public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
return Cursor.fromIterator(storage.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).iterator());
}
- /** {@inheritDoc} */
@Override
public void destroy() {
try {
@@ -100,25 +96,26 @@ public class TestConcurrentHashMapTxStateStorage implements TxStateStorage {
}
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> flush() {
return completedFuture(null);
}
- /** {@inheritDoc} */
@Override
public long lastAppliedIndex() {
- return 0;
+ return lastAppliedIndex;
+ }
+
+ @Override
+ public void lastAppliedIndex(long lastAppliedIndex) {
+ this.lastAppliedIndex = lastAppliedIndex;
}
- /** {@inheritDoc} */
@Override
public long persistedIndex() {
- return 0;
+ return lastAppliedIndex;
}
- /** {@inheritDoc} */
@Override
public void close() throws Exception {
// No-op.
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java
index c46ff4a6ad..fdd6f91caa 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java
@@ -17,10 +17,7 @@
package org.apache.ignite.internal.tx.storage.state.test;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.configuration.storage.StorageException;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -34,49 +31,46 @@ import org.jetbrains.annotations.Nullable;
public class TestConcurrentHashMapTxStateTableStorage implements TxStateTableStorage {
private final Map<Integer, TxStateStorage> storages = new ConcurrentHashMap<>();
- /** {@inheritDoc} */
@Override public TxStateStorage getOrCreateTxStateStorage(int partitionId) throws StorageException {
return storages.computeIfAbsent(partitionId, k -> new TestConcurrentHashMapTxStateStorage());
}
- /** {@inheritDoc} */
- @Override public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
+ @Override
+ public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
return storages.get(partitionId);
}
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> destroyTxStateStorage(int partitionId) throws StorageException {
- TxStateStorage storage = storages.replace(partitionId, null);
+ @Override
+ public void destroyTxStateStorage(int partitionId) throws StorageException {
+ TxStateStorage storage = storages.remove(partitionId);
if (storage != null) {
storage.destroy();
}
-
- return completedFuture(null);
}
- /** {@inheritDoc} */
- @Override public TableConfiguration configuration() {
+ @Override
+ public TableConfiguration configuration() {
return null;
}
- /** {@inheritDoc} */
- @Override public void start() throws StorageException {
+ @Override
+ public void start() throws StorageException {
// No-op.
}
- /** {@inheritDoc} */
- @Override public void stop() throws StorageException {
+ @Override
+ public void stop() throws StorageException {
// No-op.
}
- /** {@inheritDoc} */
- @Override public void destroy() throws StorageException {
+ @Override
+ public void destroy() throws StorageException {
storages.clear();
}
- /** {@inheritDoc} */
- @Override public void close() throws Exception {
+ @Override
+ public void close() throws Exception {
stop();
}
}