You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/22 06:06:28 UTC
(ignite-3) branch main updated: IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1f3ef87aa4 IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242)
1f3ef87aa4 is described below
commit 1f3ef87aa4426a9dd378d608f5e0255554e9f132
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Feb 22 09:06:23 2024 +0300
IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242)
---
.../internal/table/distributed/TableUtils.java | 28 ++++++++
.../replicator/PartitionReplicaListener.java | 14 ++++
.../replication/PartitionReplicaListenerTest.java | 78 +++++++++++++++++-----
3 files changed, 105 insertions(+), 15 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
index 5b0fe734fb..3f1b8ca61e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.table.distributed;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
import static org.apache.ignite.internal.util.CollectionUtils.view;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tx.TransactionIds;
@@ -53,4 +55,30 @@ public class TableUtils {
return view(indexes, CatalogObjectDescriptor::id);
}
+
+ /**
+ * Returns the catalog version in which the index got status {@link CatalogIndexStatus#BUILDING}.
+ *
+ * @param catalogService Catalog service.
+ * @param indexId Index ID of interest.
+ * @param fromCatalogVersionIncluded Catalog version with which the search will begin (inclusive).
+ */
+ public static int findStartBuildingIndexCatalogVersion(CatalogService catalogService, int indexId, int fromCatalogVersionIncluded) {
+ int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+ for (int catalogVersion = fromCatalogVersionIncluded; catalogVersion <= latestCatalogVersion; catalogVersion++) {
+ CatalogIndexDescriptor index = catalogService.index(indexId, catalogVersion);
+
+ assert index != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion;
+
+ if (index.status() == BUILDING) {
+ return catalogVersion;
+ }
+ }
+
+ throw new AssertionError(String.format(
+ "Could not find index in status %s: [indexId=%s, fromCatalogVersionIncluded=%s, latestCatalogVersion=%s]",
+ BUILDING, indexId, fromCatalogVersionIncluded, latestCatalogVersion
+ ));
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 879c551822..ac157fa270 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,8 +22,10 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.table.distributed.TableUtils.findStartBuildingIndexCatalogVersion;
import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.beginRwTxTs;
import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
@@ -70,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -3805,10 +3808,21 @@ public class PartitionReplicaListener implements ReplicaListener {
private CompletableFuture<?> processBuildIndexReplicaRequest(BuildIndexReplicaRequest request) {
return txRwOperationTracker.awaitCompleteTxRwOperations(request.creationCatalogVersion())
+ .thenCompose(unused -> safeTime.waitFor(indexStartBuildingActivationTs(request)))
.thenCompose(unused -> raftClient.run(toBuildIndexCommand(request)));
}
private List<Integer> indexIdsAtRwTxBeginTs(UUID txId) {
return TableUtils.indexIdsAtRwTxBeginTs(catalogService, txId, tableId());
}
+
+ private HybridTimestamp indexStartBuildingActivationTs(BuildIndexReplicaRequest request) {
+ int catalogVersion = findStartBuildingIndexCatalogVersion(catalogService, request.indexId(), request.creationCatalogVersion());
+
+ Catalog catalog = catalogService.catalog(catalogVersion);
+
+ assert catalog != null : "indexId=" + request.indexId() + ", catalogVersion=" + catalogVersion;
+
+ return hybridTimestamp(catalog.time());
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 67e40a345a..7858d3ccc1 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -21,7 +21,9 @@ import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
@@ -85,6 +87,7 @@ import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.distributed.replicator.action.RequestTypes;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
@@ -2802,7 +2805,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@ParameterizedTest(name = "readOnly = {0}")
@ValueSource(booleans = {true, false})
void testStaleTxOperationAfterIndexStartBuilding(boolean readOnly) {
- fireHashIndexStartBuildingEventForStaleTxOperation();
+ fireHashIndexStartBuildingEventForStaleTxOperation(hashIndexStorage.id(), 1);
UUID txId = newTxId();
long beginTs = beginTimestamp(txId).longValue();
@@ -2820,14 +2823,20 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
void testBuildIndexReplicaRequestWithoutRwTxOperations() {
- CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
+ int indexId = hashIndexStorage.id();
+ int indexCreationCatalogVersion = 1;
+
+ CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(
+ indexId,
+ indexCreationCatalogVersion
+ );
assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
- fireHashIndexStartBuildingEventForStaleTxOperation();
+ fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion);
assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
- assertThat(invokeBuildIndexReplicaRequestAsync(1, 1), willCompleteSuccessfully());
+ assertThat(invokeBuildIndexReplicaRequestAsync(indexId, indexCreationCatalogVersion), willCompleteSuccessfully());
}
@ParameterizedTest(name = "failCmd = {0}")
@@ -2853,9 +2862,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
BinaryRow row = binaryRow(0);
CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row, true);
- CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
- fireHashIndexStartBuildingEventForStaleTxOperation();
+ int indexId = hashIndexStorage.id();
+ int indexCreationCatalogVersion = 1;
+
+ CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(
+ indexId,
+ indexCreationCatalogVersion
+ );
+
+ fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion);
assertFalse(upsertFuture.isDone());
assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
@@ -2871,22 +2887,54 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
+
+ HybridTimestamp startBuildingIndexActivationTs = hybridTimestamp(catalogService.catalog(indexCreationCatalogVersion + 1).time());
+
+ verify(safeTimeClock).waitFor(eq(startBuildingIndexActivationTs));
}
- private void fireHashIndexStartBuildingEventForStaleTxOperation() {
- CatalogHashIndexDescriptor hashIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
+ private void fireHashIndexStartBuildingEventForStaleTxOperation(int indexId, int creationIndexCatalogVersion) {
+ var registeredIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
+ var buildingIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
- int indexId = hashIndexStorage.id();
+ when(registeredIndexDescriptor.id()).thenReturn(indexId);
+ when(buildingIndexDescriptor.id()).thenReturn(indexId);
- when(hashIndexDescriptor.id()).thenReturn(indexId);
- when(hashIndexDescriptor.tableId()).thenReturn(TABLE_ID);
- when(hashIndexDescriptor.status()).thenReturn(BUILDING);
- when(hashIndexDescriptor.txWaitCatalogVersion()).thenReturn(1);
+ when(buildingIndexDescriptor.tableId()).thenReturn(TABLE_ID);
+ when(registeredIndexDescriptor.tableId()).thenReturn(TABLE_ID);
- when(catalogService.index(eq(indexId), anyInt())).thenReturn(hashIndexDescriptor);
+ when(registeredIndexDescriptor.status()).thenReturn(REGISTERED);
+ when(buildingIndexDescriptor.status()).thenReturn(BUILDING);
+
+ when(buildingIndexDescriptor.txWaitCatalogVersion()).thenReturn(creationIndexCatalogVersion);
+
+ int startBuildingIndexCatalogVersion = creationIndexCatalogVersion + 1;
+
+ when(catalogService.index(eq(indexId), eq(creationIndexCatalogVersion))).thenReturn(registeredIndexDescriptor);
+ when(catalogService.index(eq(indexId), eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexDescriptor);
+
+ when(catalogService.latestCatalogVersion()).thenReturn(startBuildingIndexCatalogVersion);
+
+ var registeredIndexCatalog = mock(Catalog.class);
+ var buildingIndexCatalog = mock(Catalog.class);
+
+ when(registeredIndexCatalog.version()).thenReturn(creationIndexCatalogVersion);
+ when(buildingIndexCatalog.version()).thenReturn(startBuildingIndexCatalogVersion);
+
+ long registeredIndexActivationTs = clock.now().addPhysicalTime(-100).longValue();
+ long buildingIndexActivationTs = clock.nowLong();
+
+ when(registeredIndexCatalog.time()).thenReturn(registeredIndexActivationTs);
+ when(buildingIndexCatalog.time()).thenReturn(buildingIndexActivationTs);
+
+ when(catalogService.catalog(eq(creationIndexCatalogVersion))).thenReturn(registeredIndexCatalog);
+ when(catalogService.catalog(eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexCatalog);
assertThat(
- catalogServiceEventProducer.fireEvent(INDEX_BUILDING, new StartBuildingIndexEventParameters(0L, 2, indexId)),
+ catalogServiceEventProducer.fireEvent(
+ INDEX_BUILDING,
+ new StartBuildingIndexEventParameters(0L, startBuildingIndexCatalogVersion, indexId)
+ ),
willCompleteSuccessfully()
);
}