You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/17 10:12:39 UTC
(ignite-3) branch main updated: IGNITE-18595 Implement index build process during the full state transfer (#3215)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4468f976b5 IGNITE-18595 Implement index build process during the full state transfer (#3215)
4468f976b5 is described below
commit 4468f976b5db599ef0d205d8f30bbdd723df3a15
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Sat Feb 17 13:12:33 2024 +0300
IGNITE-18595 Implement index build process during the full state transfer (#3215)
---
.../ignite/internal/catalog/CatalogTestUtils.java | 2 +-
.../apache/ignite/internal/index/IndexBuilder.java | 1 -
.../impl/StandaloneMetaStorageManager.java | 20 +-
.../internal/table/distributed/TableManager.java | 24 +-
.../snapshot/FullStateTransferIndexChooser.java | 325 ++++++++++++++++++
.../distributed/raft/snapshot/PartitionAccess.java | 22 +-
.../raft/snapshot/PartitionAccessImpl.java | 22 +-
.../raft/snapshot/ReadOnlyIndexInfo.java | 104 ++++++
.../snapshot/incoming/IncomingSnapshotCopier.java | 6 +-
.../FullStateTransferIndexChooserTest.java | 373 +++++++++++++++++++++
.../raft/snapshot/PartitionAccessImplTest.java | 137 +++++---
.../incoming/IncomingSnapshotCopierTest.java | 29 +-
.../ignite/internal/table/TableTestUtils.java | 4 +
13 files changed, 974 insertions(+), 95 deletions(-)
diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index 18146c6433..07f82632c7 100644
--- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -62,7 +62,7 @@ public class CatalogTestUtils {
* @param clock Hybrid clock.
*/
public static CatalogManager createTestCatalogManager(String nodeName, HybridClock clock) {
- StandaloneMetaStorageManager metastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(nodeName));
+ StandaloneMetaStorageManager metastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(nodeName), clock);
var clockWaiter = new ClockWaiter(nodeName, clock);
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index daa3f51375..c9e8d0c32d 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -98,7 +98,6 @@ class IndexBuilder implements ManuallyCloseable {
* to the replica.
* @param creationCatalogVersion Catalog version in which the index was created.
*/
- // TODO: IGNITE-19498 Perhaps we need to start building the index only once
public void scheduleBuildIndex(
int tableId,
int partitionId,
diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 763e317215..9d8dcd73a9 100644
--- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
@@ -78,6 +79,17 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
* @param keyValueStorage Key-value storage.
*/
public static StandaloneMetaStorageManager create(KeyValueStorage keyValueStorage) {
+ return create(keyValueStorage, new HybridClockImpl());
+ }
+
+ /**
+ * Creates standalone MetaStorage manager for provided key-value storage. The manager is responsible for starting/stopping provided
+ * key-value storage.
+ *
+ * @param keyValueStorage Key-value storage.
+ * @param clock Clock.
+ */
+ public static StandaloneMetaStorageManager create(KeyValueStorage keyValueStorage, HybridClock clock) {
return new StandaloneMetaStorageManager(
mockClusterService(),
mockClusterGroupManager(),
@@ -85,7 +97,8 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
mockRaftManager(),
keyValueStorage,
mock(TopologyAwareRaftGroupServiceFactory.class),
- mockConfiguration()
+ mockConfiguration(),
+ clock
);
}
@@ -105,7 +118,8 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
RaftManager raftMgr,
KeyValueStorage storage,
TopologyAwareRaftGroupServiceFactory raftServiceFactory,
- MetaStorageConfiguration configuration
+ MetaStorageConfiguration configuration,
+ HybridClock clock
) {
super(
clusterService,
@@ -113,7 +127,7 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
logicalTopologyService,
raftMgr,
storage,
- new HybridClockImpl(),
+ clock,
raftServiceFactory,
configuration
);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4f440ce765..d0e972b101 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -156,6 +156,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEventsListener;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory;
@@ -373,6 +374,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
/** Marshallers provider. */
private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider();
+ /** Index chooser for full state transfer. */
+ private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
/**
* Creates a new table manager.
*
@@ -523,17 +527,21 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
txStateStoragePool,
TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER
);
+
+ fullStateTransferIndexChooser = new FullStateTransferIndexChooser(catalogService);
}
@Override
public CompletableFuture<Void> start() {
- inBusyLock(busyLock, () -> {
+ return inBusyLockAsync(busyLock, () -> {
mvGc.start();
lowWatermark.start();
transactionStateResolver.start();
+ fullStateTransferIndexChooser.start();
+
CompletableFuture<Long> recoveryFinishFuture = metaStorageMgr.recoveryFinishedFuture();
assert recoveryFinishFuture.isDone();
@@ -571,9 +579,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
});
partitionReplicatorNodeRecovery.start();
- });
- return nullCompletedFuture();
+ return nullCompletedFuture();
+ });
}
private void processAssignmentsOnRecovery(long recoveryRevision) {
@@ -1037,7 +1045,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
txStateTableStorage,
mvGc,
partitionUpdateHandlers.indexUpdateHandler,
- partitionUpdateHandlers.gcUpdateHandler
+ partitionUpdateHandlers.gcUpdateHandler,
+ fullStateTransferIndexChooser
),
catalogService,
incomingSnapshotsExecutor
@@ -1081,6 +1090,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
IgniteUtils.closeAllManually(
lowWatermark,
mvGc,
+ fullStateTransferIndexChooser,
() -> shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS),
() -> shutdownAndAwaitTermination(txStateStoragePool, 10, TimeUnit.SECONDS),
() -> shutdownAndAwaitTermination(txStateStorageScheduledPool, 10, TimeUnit.SECONDS),
@@ -1150,7 +1160,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
long causalityToken,
int catalogVersion,
CatalogTableDescriptor tableDescriptor,
- // TODO: IGNITE-18595 We need to do something different to wait for indexes before full rebalancing
+ // TODO: IGNITE-19513 We need to do something different to wait for indexes before full rebalancing
boolean onNodeRecovery
) {
return inBusyLockAsync(busyLock, () -> {
@@ -1212,7 +1222,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
CatalogZoneDescriptor zoneDescriptor,
CompletableFuture<List<Set<Assignment>>> assignmentsFuture,
int catalogVersion,
- // TODO: IGNITE-18595 We need to do something different to wait for indexes before full rebalancing
+ // TODO: IGNITE-19513 We need to do something different to wait for indexes before full rebalancing
boolean onNodeRecovery
) {
String tableName = tableDescriptor.name();
@@ -1242,7 +1252,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
var table = new TableImpl(internalTable, lockMgr, schemaVersions, marshallers, sql.get());
- // TODO: IGNITE-18595 We need to do something different to wait for indexes before full rebalancing
+ // TODO: IGNITE-19513 We need to do something different to wait for indexes before full rebalancing
table.addIndexesToWait(collectTableIndexIds(tableId, catalogVersion, onNodeRecovery));
tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
new file mode 100644
index 0000000000..4fbdd81b54
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
+import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CollectionUtils.view;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index chooser for full state transfer. */
+// TODO: IGNITE-21502 Deal with the case of drop a table
+// TODO: IGNITE-21502 Stop writing to a dropped index that was in status before AVAILABLE
+// TODO: IGNITE-21514 Stop writing to indexes that are destroyed during catalog compaction
+public class FullStateTransferIndexChooser implements ManuallyCloseable {
+ private final CatalogService catalogService;
+
+ private final NavigableSet<ReadOnlyIndexInfo> readOnlyIndexes = new ConcurrentSkipListSet<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /** Constructor. */
+ public FullStateTransferIndexChooser(CatalogService catalogService) {
+ this.catalogService = catalogService;
+ }
+
+ /** Starts the component. */
+ public void start() {
+ inBusyLockSafe(busyLock, () -> {
+ addListenersBusy();
+
+ recoverReadOnlyIndexesBusy();
+ });
+ }
+
+ @Override
+ public void close() {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ readOnlyIndexes.clear();
+ }
+
+ /**
+ * Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow, UUID, int, int, int)} (write intent).
+ *
+ * <p>Index selection algorithm:</p>
+ * <ul>
+ * <li>If the index in the snapshot catalog version is in status {@link CatalogIndexStatus#BUILDING},
+ * {@link CatalogIndexStatus#AVAILABLE} or {@link CatalogIndexStatus#STOPPING}.</li>
+ * <li>If the index in status {@link CatalogIndexStatus#REGISTERED} and it is in this status on the active version of the catalog
+ * for {@code beginTs}.</li>
+ * <li>For a read-only index, if {@code beginTs} is strictly less than the activation time of dropping the index.</li>
+ * </ul>
+ *
+ * @param catalogVersion Catalog version of the incoming partition snapshot.
+ * @param tableId Table ID for which indexes will be chosen.
+ * @param beginTs Begin timestamp of the transaction.
+ * @return List of index IDs sorted in ascending order.
+ */
+ public List<Integer> chooseForAddWrite(int catalogVersion, int tableId, HybridTimestamp beginTs) {
+ return inBusyLock(busyLock, () -> {
+ int activeCatalogVersionAtBeginTxTs = catalogService.activeCatalogVersion(beginTs.longValue());
+
+ List<Integer> fromCatalog = chooseFromCatalogBusy(catalogVersion, tableId, index -> {
+ if (index.status() == REGISTERED) {
+ CatalogIndexDescriptor indexAtBeginTs = catalogService.index(index.id(), activeCatalogVersionAtBeginTxTs);
+
+ return indexAtBeginTs != null && indexAtBeginTs.status() == REGISTERED;
+ }
+
+ return true;
+ });
+
+ List<Integer> fromReadOnlyIndexes = chooseFromReadOnlyIndexesBusy(tableId, beginTs);
+
+ return mergeWithoutDuplicates(fromCatalog, fromReadOnlyIndexes);
+ });
+ }
+
+ /**
+ * Collect indexes for {@link PartitionAccess#addWriteCommitted(RowId, BinaryRow, HybridTimestamp, int)} (write committed only).
+ *
+ * <p>Index selection algorithm:</p>
+ * <ul>
+ * <li>If the index in the snapshot catalog version is in status {@link CatalogIndexStatus#BUILDING},
+ * {@link CatalogIndexStatus#AVAILABLE} or {@link CatalogIndexStatus#STOPPING}.</li>
+ * <li>For a read-only index, if {@code commitTs} is strictly less than the activation time of dropping the index.</li>
+ * </ul>
+ *
+ * @param catalogVersion Catalog version of the incoming partition snapshot.
+ * @param tableId Table ID for which indexes will be chosen.
+ * @param commitTs Timestamp to associate with committed value.
+ * @return List of index IDs sorted in ascending order.
+ */
+ public List<Integer> chooseForAddWriteCommitted(int catalogVersion, int tableId, HybridTimestamp commitTs) {
+ return inBusyLock(busyLock, () -> {
+ List<Integer> fromCatalog = chooseFromCatalogBusy(catalogVersion, tableId, index -> index.status() != REGISTERED);
+
+ List<Integer> fromReadOnlyIndexes = chooseFromReadOnlyIndexesBusy(tableId, commitTs);
+
+ return mergeWithoutDuplicates(fromCatalog, fromReadOnlyIndexes);
+ });
+ }
+
+ private List<Integer> chooseFromCatalogBusy(int catalogVersion, int tableId, Predicate<CatalogIndexDescriptor> filter) {
+ List<CatalogIndexDescriptor> indexes = catalogService.indexes(catalogVersion, tableId);
+
+ if (indexes.isEmpty()) {
+ return List.of();
+ }
+
+ var result = new ArrayList<CatalogIndexDescriptor>(indexes.size());
+
+ for (CatalogIndexDescriptor index : indexes) {
+ switch (index.status()) {
+ case REGISTERED:
+ case BUILDING:
+ case AVAILABLE:
+ case STOPPING:
+ if (filter.test(index)) {
+ result.add(index);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown index status: " + index.status());
+ }
+ }
+
+ return view(result, CatalogObjectDescriptor::id);
+ }
+
+ private List<Integer> chooseFromReadOnlyIndexesBusy(int tableId, HybridTimestamp fromTsExcluded) {
+ ReadOnlyIndexInfo fromKeyIncluded = new ReadOnlyIndexInfo(tableId, fromTsExcluded.longValue() + 1, 0);
+ ReadOnlyIndexInfo toKeyExcluded = new ReadOnlyIndexInfo(tableId + 1, 0, 0);
+
+ NavigableSet<ReadOnlyIndexInfo> subSet = readOnlyIndexes.subSet(fromKeyIncluded, true, toKeyExcluded, false);
+
+ if (subSet.isEmpty()) {
+ return List.of();
+ }
+
+ return subSet.stream().map(ReadOnlyIndexInfo::indexId).sorted().collect(toList());
+ }
+
+ private static List<Integer> mergeWithoutDuplicates(List<Integer> l0, List<Integer> l1) {
+ if (l0.isEmpty()) {
+ return l1;
+ } else if (l1.isEmpty()) {
+ return l0;
+ }
+
+ var result = new ArrayList<Integer>(l0.size() + l1.size());
+
+ for (int i0 = 0, i1 = 0; i0 < l0.size() || i1 < l1.size(); ) {
+ if (i0 >= l0.size()) {
+ result.add(l1.get(i1++));
+ } else if (i1 >= l1.size()) {
+ result.add(l0.get(i0++));
+ } else {
+ Integer indexId0 = l0.get(i0);
+ Integer indexId1 = l1.get(i1);
+
+ if (indexId0 < indexId1) {
+ result.add(indexId0);
+ i0++;
+ } else if (indexId0 > indexId1) {
+ result.add(indexId1);
+ i1++;
+ } else {
+ result.add(indexId0);
+ i0++;
+ i1++;
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private void addListenersBusy() {
+ catalogService.listen(INDEX_REMOVED, (parameters, exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ return onIndexRemoved((RemoveIndexEventParameters) parameters).thenApply(unused -> false);
+ });
+ }
+
+ private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int indexId = parameters.indexId();
+ int catalogVersion = parameters.catalogVersion();
+
+ CatalogIndexDescriptor index = indexBusy(indexId, catalogVersion - 1);
+
+ if (index.status() == AVAILABLE) {
+ // On drop table event.
+ readOnlyIndexes.add(new ReadOnlyIndexInfo(index, catalogActivationTimestampBusy(catalogVersion)));
+ } else if (index.status() == STOPPING) {
+ readOnlyIndexes.add(new ReadOnlyIndexInfo(index, findStoppingActivationTsBusy(indexId, catalogVersion - 1)));
+ }
+
+ return nullCompletedFuture();
+ });
+ }
+
+ private long catalogActivationTimestampBusy(int catalogVersion) {
+ Catalog catalog = catalogService.catalog(catalogVersion);
+
+ assert catalog != null : catalogVersion;
+
+ return catalog.time();
+ }
+
+ private void recoverReadOnlyIndexesBusy() {
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+ var readOnlyIndexById = new HashMap<Integer, ReadOnlyIndexInfo>();
+ var previousCatalogVersionTableIds = Set.<Integer>of();
+
+ // TODO: IGNITE-21514 Deal with catalog compaction
+ for (int catalogVersion = earliestCatalogVersion; catalogVersion <= latestCatalogVersion; catalogVersion++) {
+ long activationTs = catalogActivationTimestampBusy(catalogVersion);
+
+ catalogService.indexes(catalogVersion).stream()
+ .filter(index -> index.status() == STOPPING)
+ .forEach(index -> readOnlyIndexById.computeIfAbsent(index.id(), i -> new ReadOnlyIndexInfo(index, activationTs)));
+
+ Set<Integer> currentCatalogVersionTableIds = tableIds(catalogVersion);
+
+ // Here we look for indices that transitioned directly from AVAILABLE to [deleted] (corresponding to the logical READ_ONLY
+ // state) as such transitions only happen when a table is dropped.
+ int finalCatalogVersion = catalogVersion;
+ difference(previousCatalogVersionTableIds, currentCatalogVersionTableIds).stream()
+ .flatMap(droppedTableId -> catalogService.indexes(finalCatalogVersion - 1, droppedTableId).stream())
+ .filter(index -> index.status() == AVAILABLE)
+ .forEach(index -> readOnlyIndexById.computeIfAbsent(index.id(), i -> new ReadOnlyIndexInfo(index, activationTs)));
+
+ previousCatalogVersionTableIds = currentCatalogVersionTableIds;
+ }
+
+ readOnlyIndexes.addAll(readOnlyIndexById.values());
+ }
+
+ private CatalogIndexDescriptor indexBusy(int indexId, int catalogVersion) {
+ CatalogIndexDescriptor index = catalogService.index(indexId, catalogVersion);
+
+ assert index != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion;
+
+ return index;
+ }
+
+ private long findStoppingActivationTsBusy(int indexId, int toCatalogVersionIncluded) {
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+
+ for (int catalogVersion = toCatalogVersionIncluded; catalogVersion >= earliestCatalogVersion; catalogVersion--) {
+ if (indexBusy(indexId, catalogVersion).status() == AVAILABLE) {
+ return catalogActivationTimestampBusy(catalogVersion + 1);
+ }
+ }
+
+ throw new AssertionError(format(
+ "{} status activation timestamp was not found for index: [indexId={}, toCatalogVersionIncluded={}]",
+ STOPPING, indexId, toCatalogVersionIncluded
+ ));
+ }
+
+ private Set<Integer> tableIds(int catalogVersion) {
+ return catalogService.tables(catalogVersion).stream().map(CatalogObjectDescriptor::id).collect(toSet());
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index e6f8230a24..06e25d346a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -85,32 +85,34 @@ public interface PartitionAccess {
@Nullable RaftGroupConfiguration committedGroupConfiguration();
/**
- * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id. In details: - if there is no
+ * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction ID. In details: - if there is no
* uncommitted version, a new uncommitted version is added - if there is an uncommitted version belonging to the same transaction, it
* gets replaced by the given version - if there is an uncommitted version belonging to a different transaction,
* {@link TxIdMismatchException} is thrown
*
- * @param rowId Row id.
+ * @param rowId Row ID.
* @param row Table row to update. Key only row means value removal.
- * @param txId Transaction id.
- * @param commitTableId Commit table id.
+ * @param txId Transaction ID.
+ * @param commitTableId Commit table ID.
* @param commitPartitionId Commit partitionId.
- * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
+ * @param catalogVersion Catalog version of the incoming partition snapshot.
+ * @throws TxIdMismatchException If there's another pending update associated with different transaction ID.
* @throws StorageException If failed to write data.
*/
- void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId);
+ void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId, int catalogVersion);
/**
* Creates a committed version. In details: - if there is no uncommitted version, a new committed version is added - if there is an
* uncommitted version, this method may fail with a system exception (this method should not be called if there is already something
* uncommitted for the given row).
*
- * @param rowId Row id.
+ * @param rowId Row ID.
* @param row Table row to update. Key only row means value removal.
* @param commitTimestamp Timestamp to associate with committed value.
+ * @param catalogVersion Catalog version of the incoming partition snapshot.
* @throws StorageException If failed to write data.
*/
- void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp);
+ void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp, int catalogVersion);
/**
* Returns the minimum applied index of the partition storages.
@@ -147,8 +149,8 @@ public interface PartitionAccess {
* <li>{@link #maxLastAppliedTerm()};</li>
* <li>{@link #committedGroupConfiguration()};</li>
* <li>{@link #addTxMeta(UUID, TxMeta)};</li>
- * <li>{@link #addWrite(RowId, BinaryRow, UUID, int, int)};</li>
- * <li>{@link #addWriteCommitted(RowId, BinaryRow, HybridTimestamp)}.</li>
+ * <li>{@link #addWrite(RowId, BinaryRow, UUID, int, int, int)};</li>
+ * <li>{@link #addWriteCommitted(RowId, BinaryRow, HybridTimestamp, int)}.</li>
* </ul></li>
* </ul>
*
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index cddedf3bb1..933576e9c3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import java.util.List;
import java.util.UUID;
@@ -60,6 +61,8 @@ public class PartitionAccessImpl implements PartitionAccess {
private final GcUpdateHandler gcUpdateHandler;
+ private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
/**
* Constructor.
*
@@ -69,6 +72,7 @@ public class PartitionAccessImpl implements PartitionAccess {
* @param mvGc Garbage collector for multi-versioned storages and their indexes in the background.
* @param indexUpdateHandler Index update handler.
* @param gcUpdateHandler Gc update handler.
+ * @param fullStateTransferIndexChooser Index chooser for full state transfer.
*/
public PartitionAccessImpl(
PartitionKey partitionKey,
@@ -76,7 +80,8 @@ public class PartitionAccessImpl implements PartitionAccess {
TxStateTableStorage txStateTableStorage,
MvGc mvGc,
IndexUpdateHandler indexUpdateHandler,
- GcUpdateHandler gcUpdateHandler
+ GcUpdateHandler gcUpdateHandler,
+ FullStateTransferIndexChooser fullStateTransferIndexChooser
) {
this.partitionKey = partitionKey;
this.mvTableStorage = mvTableStorage;
@@ -84,6 +89,7 @@ public class PartitionAccessImpl implements PartitionAccess {
this.mvGc = mvGc;
this.indexUpdateHandler = indexUpdateHandler;
this.gcUpdateHandler = gcUpdateHandler;
+ this.fullStateTransferIndexChooser = fullStateTransferIndexChooser;
}
@Override
@@ -135,32 +141,34 @@ public class PartitionAccessImpl implements PartitionAccess {
}
@Override
- public void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId) {
+ public void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId, int catalogVersion) {
MvPartitionStorage mvPartitionStorage = getMvPartitionStorage(partitionId());
+ List<Integer> indexIds = fullStateTransferIndexChooser.chooseForAddWrite(catalogVersion, tableId(), beginTimestamp(txId));
+
mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
mvPartitionStorage.addWrite(rowId, row, txId, commitTableId, commitPartitionId);
- // TODO: IGNITE-18595 We need to know the indexes for a full rebalance, i.e. null must go
- indexUpdateHandler.addToIndexes(row, rowId, null);
+ indexUpdateHandler.addToIndexes(row, rowId, indexIds);
return null;
});
}
@Override
- public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) {
+ public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp, int catalogVersion) {
MvPartitionStorage mvPartitionStorage = getMvPartitionStorage(partitionId());
+ List<Integer> indexIds = fullStateTransferIndexChooser.chooseForAddWriteCommitted(catalogVersion, tableId(), commitTimestamp);
+
mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
mvPartitionStorage.addWriteCommitted(rowId, row, commitTimestamp);
- // TODO: IGNITE-18595 We need to know the indexes for a full rebalance, i.e. null must go
- indexUpdateHandler.addToIndexes(row, rowId, null);
+ indexUpdateHandler.addToIndexes(row, rowId, indexIds);
return null;
});
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/ReadOnlyIndexInfo.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/ReadOnlyIndexInfo.java
new file mode 100644
index 0000000000..9fc733fbc1
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/ReadOnlyIndexInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.tostring.S;
+
+/** Internal class for use in {@link FullStateTransferIndexChooser} for read-only indexes. */
+final class ReadOnlyIndexInfo implements Comparable<ReadOnlyIndexInfo> {
+ private final int tableId;
+
+ /**
+ * Timestamp of activation of the catalog version in which the index got the status {@link CatalogIndexStatus#STOPPING} or the table was
+ * dropped and the index had the status {@link CatalogIndexStatus#AVAILABLE}.
+ */
+ private final long activationTs;
+
+ private final int indexId;
+
+ public ReadOnlyIndexInfo(CatalogIndexDescriptor index, long activationTs) {
+ this(index.tableId(), activationTs, index.id());
+ }
+
+ ReadOnlyIndexInfo(int tableId, long activationTs, int indexId) {
+ this.tableId = tableId;
+ this.activationTs = activationTs;
+ this.indexId = indexId;
+ }
+
+ int tableId() {
+ return tableId;
+ }
+
+ long activationTs() {
+ return activationTs;
+ }
+
+ int indexId() {
+ return indexId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReadOnlyIndexInfo other = (ReadOnlyIndexInfo) o;
+
+ return tableId == other.tableId
+ && activationTs == other.activationTs
+ && indexId == other.indexId;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableId;
+ result = 31 * result + (int) (activationTs ^ (activationTs >>> 32));
+ result = 31 * result + indexId;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+
+
+ @Override
+ public int compareTo(ReadOnlyIndexInfo other) {
+ int cmp = Integer.compare(tableId, other.tableId);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = Long.compare(activationTs, other.activationTs);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Integer.compare(indexId, other.indexId);
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 72eb1c4aaf..ab59b244cf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -492,16 +492,18 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
PartitionAccess partition = partitionSnapshotStorage.partition();
+ int snapshotCatalogVersion = snapshotMeta.requiredCatalogVersion();
+
if (i == entry.timestamps().length) {
// Writes an intent to write (uncommitted version).
assert entry.txId() != null;
assert entry.commitTableId() != null;
assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
- partition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
+ partition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId(), snapshotCatalogVersion);
} else {
// Writes committed version.
- partition.addWriteCommitted(rowId, binaryRow, hybridTimestamp(entry.timestamps()[i]));
+ partition.addWriteCommitted(rowId, binaryRow, hybridTimestamp(entry.timestamps()[i]), snapshotCatalogVersion);
}
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
new file mode 100644
index 0000000000..7e82ed370b
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.table.TableTestUtils.INDEX_NAME;
+import static org.apache.ignite.internal.table.TableTestUtils.PK_INDEX_NAME;
+import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
+import static org.apache.ignite.internal.table.TableTestUtils.createSimpleHashIndex;
+import static org.apache.ignite.internal.table.TableTestUtils.createSimpleTable;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
+import static org.apache.ignite.internal.table.TableTestUtils.makeIndexAvailable;
+import static org.apache.ignite.internal.table.TableTestUtils.startBuildingIndex;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** For {@link FullStateTransferIndexChooser} testing. */
+public class FullStateTransferIndexChooserTest extends BaseIgniteAbstractTest {
+ private static final String REGISTERED_INDEX_NAME = INDEX_NAME + "_" + REGISTERED;
+
+ private static final String BUILDING_INDEX_NAME = INDEX_NAME + "_" + BUILDING;
+
+ private static final String AVAILABLE_INDEX_NAME = INDEX_NAME + "_" + AVAILABLE;
+
+ private static final String STOPPING_INDEX_NAME = INDEX_NAME + "_" + STOPPING;
+
+ private static final String READ_ONLY_INDEX_NAME = INDEX_NAME + "_READ_ONLY";
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ private CatalogManager catalogManager;
+
+ private FullStateTransferIndexChooser indexChooser;
+
+ @BeforeEach
+ void setUp() {
+ catalogManager = CatalogTestUtils.createTestCatalogManager("test", clock);
+
+ indexChooser = new FullStateTransferIndexChooser(catalogManager);
+
+ assertThat(catalogManager.start(), willCompleteSuccessfully());
+
+ indexChooser.start();
+
+ createSimpleTable(catalogManager, TABLE_NAME);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ closeAllManually(
+ indexChooser,
+ catalogManager::beforeNodeStop,
+ catalogManager::stop
+ );
+ }
+
+ @Test
+ void chooseForAddWriteWithSecondaryAndWithoutReadOnlyIndexes() {
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ assertThat(chooseForAddWriteLatest(), contains(pkIndexId));
+
+ createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+ int registeredIndexId = indexId(REGISTERED_INDEX_NAME);
+ assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId));
+
+ createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+ int buildingIndexId = indexId(BUILDING_INDEX_NAME);
+ assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId, buildingIndexId));
+
+ createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+ int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+ assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId, buildingIndexId, availableIndexId));
+
+ createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+ int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+ assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId, buildingIndexId, availableIndexId, stoppingIndexId));
+ }
+
+ @Test
+ void chooseForAddWriteWithSecondaryAndWithoutReadOnlyAndRegisteredIndexes() {
+ HybridTimestamp beginTsBeforeCreateRegisteredIndex = clock.now();
+
+ createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ int registeredIndexId = indexId(REGISTERED_INDEX_NAME);
+
+ assertThat(chooseForAddWriteLatest(beginTsBeforeCreateRegisteredIndex), contains(pkIndexId));
+
+ int catalogVersionBeforeStartBuildingIndex = latestCatalogVersion();
+
+ startBuildingIndex(catalogManager, registeredIndexId);
+
+ assertThat(
+ indexChooser.chooseForAddWrite(catalogVersionBeforeStartBuildingIndex, tableId(TABLE_NAME), clock.now()),
+ contains(pkIndexId)
+ );
+ }
+
+ @Test
+ void chooseForAddWriteCommittedWithSecondaryAndWithoutReadOnlyIndexes() {
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId));
+
+ createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+ assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId));
+
+ createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+ int buildingIndexId = indexId(BUILDING_INDEX_NAME);
+ assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId, buildingIndexId));
+
+ createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+ int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+ assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId, buildingIndexId, availableIndexId));
+
+ createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+ int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+ assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId, buildingIndexId, availableIndexId, stoppingIndexId));
+ }
+
+ @ParameterizedTest(name = "recovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void chooseForAddWriteCommittedWithSecondaryAndReadOnlyIndexes(boolean recovery) {
+ createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+ createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+
+ HybridTimestamp commitTsBeforeStoppingIndex = clock.now();
+
+ createSimpleStoppingIndex(READ_ONLY_INDEX_NAME);
+
+ HybridTimestamp commitTsOnStoppingIndex = latestCatalogVersionActivationTs();
+
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ int readOnlyIndexId = indexId(READ_ONLY_INDEX_NAME);
+
+ dropIndex(REGISTERED_INDEX_NAME);
+ dropIndex(BUILDING_INDEX_NAME);
+ removeIndex(READ_ONLY_INDEX_NAME);
+
+ if (recovery) {
+ recoverIndexChooser();
+ }
+
+ assertThat(chooseForAddWriteCommittedLatest(commitTsBeforeStoppingIndex), contains(pkIndexId, readOnlyIndexId));
+ assertThat(chooseForAddWriteCommittedLatest(commitTsOnStoppingIndex), contains(pkIndexId));
+ assertThat(chooseForAddWriteCommittedLatest(clock.now()), contains(pkIndexId));
+ }
+
+ @ParameterizedTest(name = "recovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void chooseForAddWriteWithSecondaryAndReadOnlyIndexes(boolean recovery) {
+ createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+ createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+
+ HybridTimestamp beginTsBeforeStoppingIndex = clock.now();
+
+ createSimpleStoppingIndex(READ_ONLY_INDEX_NAME);
+
+ HybridTimestamp beginTsOnStoppingIndex = latestCatalogVersionActivationTs();
+
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ int readOnlyIndexId = indexId(READ_ONLY_INDEX_NAME);
+
+ dropIndex(REGISTERED_INDEX_NAME);
+ dropIndex(BUILDING_INDEX_NAME);
+ removeIndex(READ_ONLY_INDEX_NAME);
+
+ if (recovery) {
+ recoverIndexChooser();
+ }
+
+ assertThat(chooseForAddWriteLatest(beginTsBeforeStoppingIndex), contains(pkIndexId, readOnlyIndexId));
+ assertThat(chooseForAddWriteLatest(beginTsOnStoppingIndex), contains(pkIndexId));
+ assertThat(chooseForAddWriteLatest(clock.now()), contains(pkIndexId));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void chooseForAddWriteCommittedForDroppedTable(boolean recovery) {
+ HybridTimestamp commitTsBeforeCreateIndexes = clock.now();
+
+ createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+ createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+ createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+ createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+ int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+
+ int tableId = tableId(TABLE_NAME);
+
+ HybridTimestamp commitTsBeforeDropTable = clock.now();
+
+ dropTable();
+
+ if (recovery) {
+ recoverIndexChooser();
+ }
+
+ assertThat(
+ chooseForAddWriteCommittedLatest(tableId, commitTsBeforeCreateIndexes),
+ contains(pkIndexId, availableIndexId, stoppingIndexId)
+ );
+
+ assertThat(
+ chooseForAddWriteCommittedLatest(tableId, commitTsBeforeDropTable),
+ contains(pkIndexId, availableIndexId)
+ );
+
+ assertThat(chooseForAddWriteCommittedLatest(tableId, clock.now()), empty());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void chooseForAddWriteForDroppedTable(boolean recovery) {
+ HybridTimestamp beginTsBeforeCreateIndexes = clock.now();
+
+ createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+ createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+ createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+ createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+ int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+
+ int tableId = tableId(TABLE_NAME);
+
+ HybridTimestamp beginTsBeforeDropTable = clock.now();
+
+ dropTable();
+
+ if (recovery) {
+ recoverIndexChooser();
+ }
+
+ assertThat(
+ chooseForAddWriteLatest(tableId, beginTsBeforeCreateIndexes),
+ contains(pkIndexId, availableIndexId, stoppingIndexId)
+ );
+
+ assertThat(
+ chooseForAddWriteLatest(tableId, beginTsBeforeDropTable),
+ contains(pkIndexId, availableIndexId)
+ );
+
+ assertThat(chooseForAddWriteLatest(tableId, clock.now()), empty());
+ }
+
+ private List<Integer> chooseForAddWriteCommittedLatest(int tableId, HybridTimestamp commitTs) {
+ return indexChooser.chooseForAddWriteCommitted(latestCatalogVersion(), tableId, commitTs);
+ }
+
+ private List<Integer> chooseForAddWriteCommittedLatest(HybridTimestamp commitTs) {
+ return chooseForAddWriteCommittedLatest(tableId(TABLE_NAME), commitTs);
+ }
+
+ private List<Integer> chooseForAddWriteCommittedLatest() {
+ return chooseForAddWriteCommittedLatest(HybridTimestamp.MAX_VALUE);
+ }
+
+ private List<Integer> chooseForAddWriteLatest(int tableId, HybridTimestamp beginTs) {
+ return indexChooser.chooseForAddWrite(latestCatalogVersion(), tableId, beginTs);
+ }
+
+ private List<Integer> chooseForAddWriteLatest(HybridTimestamp beginTs) {
+ return chooseForAddWriteLatest(tableId(TABLE_NAME), beginTs);
+ }
+
+ private List<Integer> chooseForAddWriteLatest() {
+ return chooseForAddWriteLatest(HybridTimestamp.MAX_VALUE);
+ }
+
+ private void createSimpleRegisteredIndex(String indexName) {
+ createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
+ }
+
+ private void createSimpleBuildingIndex(String indexName) {
+ createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
+ startBuildingIndex(catalogManager, indexId(indexName));
+ }
+
+ private void createSimpleAvailableIndex(String indexName) {
+ createSimpleBuildingIndex(indexName);
+ makeIndexAvailable(catalogManager, indexId(indexName));
+ }
+
+ private void createSimpleStoppingIndex(String indexName) {
+ createSimpleAvailableIndex(indexName);
+ dropIndex(indexName);
+ }
+
+ private void removeIndex(String indexName) {
+ TableTestUtils.removeIndex(catalogManager, indexName);
+ }
+
+ private void dropIndex(String indexName) {
+ TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
+ }
+
+ private int latestCatalogVersion() {
+ return catalogManager.latestCatalogVersion();
+ }
+
+ private HybridTimestamp latestCatalogVersionActivationTs() {
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ Catalog catalog = catalogManager.catalog(catalogVersion);
+
+ assertNotNull(catalog, "catalogVersion=" + catalogVersion);
+
+ return hybridTimestamp(catalog.time());
+ }
+
+ private int tableId(String tableName) {
+ return getTableIdStrict(catalogManager, tableName, clock.nowLong());
+ }
+
+ private int indexId(String indexName) {
+ return getIndexIdStrict(catalogManager, indexName, clock.nowLong());
+ }
+
+ private void recoverIndexChooser() {
+ indexChooser.close();
+ indexChooser = new FullStateTransferIndexChooser(catalogManager);
+ indexChooser.start();
+ }
+
+ private void dropTable() {
+ TableTestUtils.dropTable(catalogManager, DEFAULT_SCHEMA_NAME, TABLE_NAME);
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
index 64c6f40025..0d70af7f4e 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
@@ -18,16 +18,21 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
+import static org.apache.ignite.internal.tx.TransactionIds.transactionId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -41,12 +46,11 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.junit.jupiter.api.Test;
-/**
- * For {@link PartitionAccessImpl} testing.
- */
+/** For {@link PartitionAccessImpl} testing. */
public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
private static final int TABLE_ID = 1;
@@ -60,14 +64,7 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
MvPartitionStorage mvPartitionStorage = createMvPartition(mvTableStorage, TEST_PARTITION_ID);
TxStateStorage txStateStorage = txStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION_ID);
- PartitionAccess partitionAccess = new PartitionAccessImpl(
- testPartitionKey(),
- mvTableStorage,
- txStateTableStorage,
- mock(MvGc.class),
- mock(IndexUpdateHandler.class),
- mock(GcUpdateHandler.class)
- );
+ PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, txStateTableStorage);
assertEquals(0, partitionAccess.minLastAppliedIndex());
assertEquals(0, partitionAccess.maxLastAppliedIndex());
@@ -106,14 +103,7 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
MvPartitionStorage mvPartitionStorage = createMvPartition(mvTableStorage, TEST_PARTITION_ID);
TxStateStorage txStateStorage = txStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION_ID);
- PartitionAccess partitionAccess = new PartitionAccessImpl(
- testPartitionKey(),
- mvTableStorage,
- txStateTableStorage,
- mock(MvGc.class),
- mock(IndexUpdateHandler.class),
- mock(GcUpdateHandler.class)
- );
+ PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, txStateTableStorage);
assertEquals(0, partitionAccess.minLastAppliedTerm());
assertEquals(0, partitionAccess.maxLastAppliedTerm());
@@ -148,36 +138,41 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
IndexUpdateHandler indexUpdateHandler = mock(IndexUpdateHandler.class);
- PartitionAccess partitionAccess = new PartitionAccessImpl(
- testPartitionKey(),
- mvTableStorage,
- new TestTxStateTableStorage(),
- mock(MvGc.class),
- indexUpdateHandler,
- mock(GcUpdateHandler.class)
- );
+ FullStateTransferIndexChooser fullStateTransferIndexChooser = mock(FullStateTransferIndexChooser.class);
+
+ PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, indexUpdateHandler, fullStateTransferIndexChooser);
+
+ List<Integer> indexIds = List.of(1);
+
+ when(fullStateTransferIndexChooser.chooseForAddWrite(anyInt(), anyInt(), any())).thenReturn(indexIds);
RowId rowId = new RowId(TEST_PARTITION_ID);
BinaryRow binaryRow = mock(BinaryRow.class);
- UUID txId = UUID.randomUUID();
+ UUID txId = transactionId(hybridTimestamp(System.currentTimeMillis()), 1);
int commitTableId = 999;
+ int snapshotCatalogVersion = 666;
+ HybridTimestamp beginTs = beginTimestamp(txId);
+
+ partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID, snapshotCatalogVersion);
- partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID);
+ verify(mvPartitionStorage).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
- verify(mvPartitionStorage, times(1)).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
+ verify(fullStateTransferIndexChooser).chooseForAddWrite(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(beginTs));
- verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+ verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
// Let's check with a null binaryRow.
binaryRow = null;
- reset(mvPartitionStorage, indexUpdateHandler);
+ clearInvocations(mvPartitionStorage, indexUpdateHandler, fullStateTransferIndexChooser);
- partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID);
+ partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID, snapshotCatalogVersion);
- verify(mvPartitionStorage, times(1)).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
+ verify(mvPartitionStorage).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
- verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+ verify(fullStateTransferIndexChooser).chooseForAddWrite(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(beginTs));
+
+ verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
}
@Test
@@ -188,34 +183,39 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
IndexUpdateHandler indexUpdateHandler = mock(IndexUpdateHandler.class);
- PartitionAccess partitionAccess = new PartitionAccessImpl(
- testPartitionKey(),
- mvTableStorage,
- new TestTxStateTableStorage(),
- mock(MvGc.class),
- indexUpdateHandler,
- mock(GcUpdateHandler.class)
- );
+ FullStateTransferIndexChooser fullStateTransferIndexChooser = mock(FullStateTransferIndexChooser.class);
+
+ PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, indexUpdateHandler, fullStateTransferIndexChooser);
+
+ List<Integer> indexIds = List.of(1);
+
+ when(fullStateTransferIndexChooser.chooseForAddWriteCommitted(anyInt(), anyInt(), any())).thenReturn(indexIds);
RowId rowId = new RowId(TEST_PARTITION_ID);
BinaryRow binaryRow = mock(BinaryRow.class);
+ HybridTimestamp commitTimestamp = HybridTimestamp.MIN_VALUE.addPhysicalTime(100500);
+ int snapshotCatalogVersion = 666;
- partitionAccess.addWriteCommitted(rowId, binaryRow, HybridTimestamp.MAX_VALUE);
+ partitionAccess.addWriteCommitted(rowId, binaryRow, commitTimestamp, snapshotCatalogVersion);
- verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId), eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
+ verify(mvPartitionStorage).addWriteCommitted(eq(rowId), eq(binaryRow), eq(commitTimestamp));
- verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+ verify(fullStateTransferIndexChooser).chooseForAddWriteCommitted(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(commitTimestamp));
+
+ verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
// Let's check with a null binaryRow.
binaryRow = null;
- reset(mvPartitionStorage, indexUpdateHandler);
+ clearInvocations(mvPartitionStorage, indexUpdateHandler, fullStateTransferIndexChooser);
+
+ partitionAccess.addWriteCommitted(rowId, binaryRow, commitTimestamp, snapshotCatalogVersion);
- partitionAccess.addWriteCommitted(rowId, binaryRow, HybridTimestamp.MAX_VALUE);
+ verify(mvPartitionStorage).addWriteCommitted(eq(rowId), eq(binaryRow), eq(commitTimestamp));
- verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId), eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
+ verify(fullStateTransferIndexChooser).chooseForAddWriteCommitted(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(commitTimestamp));
- verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+ verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
}
private static MvPartitionStorage createMvPartition(MvTableStorage tableStorage, int partitionId) {
@@ -225,4 +225,35 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
return createMvPartitionFuture.join();
}
+
+ private static PartitionAccessImpl createPartitionAccessImpl(
+ MvTableStorage mvTableStorage,
+ TxStateTableStorage txStateTableStorage
+ ) {
+ return new PartitionAccessImpl(
+ testPartitionKey(),
+ mvTableStorage,
+ txStateTableStorage,
+ mock(MvGc.class),
+ mock(IndexUpdateHandler.class),
+ mock(GcUpdateHandler.class),
+ mock(FullStateTransferIndexChooser.class)
+ );
+ }
+
+ private static PartitionAccessImpl createPartitionAccessImpl(
+ MvTableStorage mvTableStorage,
+ IndexUpdateHandler indexUpdateHandler,
+ FullStateTransferIndexChooser fullStateTransferIndexChooser
+ ) {
+ return new PartitionAccessImpl(
+ testPartitionKey(),
+ mvTableStorage,
+ new TestTxStateTableStorage(),
+ mock(MvGc.class),
+ indexUpdateHandler,
+ mock(GcUpdateHandler.class),
+ fullStateTransferIndexChooser
+ );
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 89a534e11c..24f5fd881b 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
@@ -92,6 +93,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -130,7 +132,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
new Column[]{new Column("value", NativeTypes.stringOf(256), false)}
);
- private static final HybridClock HYBRID_CLOCK = new HybridClockImpl();
+ private static final HybridClock CLOCK = new HybridClockImpl();
private static final TableMessagesFactory TABLE_MSG_FACTORY = new TableMessagesFactory();
@@ -314,7 +316,8 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
incomingTxStateTableStorage,
mvGc,
mock(IndexUpdateHandler.class),
- mock(GcUpdateHandler.class)
+ mock(GcUpdateHandler.class),
+ mock(FullStateTransferIndexChooser.class)
)),
catalogService,
mock(SnapshotMeta.class),
@@ -336,10 +339,10 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
for (int i = 0; i < rowIds.size(); i++) {
if (i % 2 == 0) {
// Writes committed version.
- storage.addWriteCommitted(rowIds.get(i), createRow("k" + i, "v" + i), HYBRID_CLOCK.now());
+ storage.addWriteCommitted(rowIds.get(i), createRow("k" + i, "v" + i), CLOCK.now());
} else {
// Writes an intent to write (uncommitted version).
- storage.addWrite(rowIds.get(i), createRow("k" + i, "v" + i), UUID.randomUUID(), 999, TEST_PARTITION);
+ storage.addWrite(rowIds.get(i), createRow("k" + i, "v" + i), generateTxId(), 999, TEST_PARTITION);
}
}
@@ -362,7 +365,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
for (int i = 0; i < txIds.size(); i++) {
TxState txState = i % 2 == 0 ? COMMITTED : ABORTED;
- storage.put(txIds.get(i), new TxMeta(txState, HYBRID_CLOCK.now()));
+ storage.put(txIds.get(i), new TxMeta(txState, CLOCK.now()));
}
storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
@@ -564,7 +567,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
return null;
}).when(partitionSnapshotStorage.partition())
- .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt());
+ .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt(), anyInt());
// Let's start rebalancing.
SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom(
@@ -608,7 +611,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
// Let's add an error on the rebalance.
doThrow(StorageException.class).when(partitionSnapshotStorage.partition())
- .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt());
+ .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt(), anyInt());
// Let's start rebalancing.
SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom(
@@ -658,10 +661,10 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
private static List<UUID> generateTxIds() {
return List.of(
- UUID.randomUUID(),
- UUID.randomUUID(),
- UUID.randomUUID(),
- UUID.randomUUID()
+ generateTxId(),
+ generateTxId(),
+ generateTxId(),
+ generateTxId()
);
}
@@ -731,4 +734,8 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
assertFalse(incomingMvPartitionStorage.scan(HybridTimestamp.MAX_VALUE).hasNext());
assertFalse(incomingTxStatePartitionStorage.scan().hasNext());
}
+
+ private static UUID generateTxId() {
+ return TransactionIds.transactionId(CLOCK.now(), 1);
+ }
}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index bc0e92bb55..dc4588069d 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -50,6 +51,9 @@ public class TableTestUtils {
/** Index name. */
public static final String INDEX_NAME = "TEST_INDEX";
+ /** Name of the primary key index for {@link #TABLE_NAME}. */
+ public static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
/** Column name. */
public static final String COLUMN_NAME = "TEST_COLUMN";