You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/22 06:06:28 UTC

(ignite-3) branch main updated: IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f3ef87aa4 IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242)
1f3ef87aa4 is described below

commit 1f3ef87aa4426a9dd378d608f5e0255554e9f132
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Feb 22 09:06:23 2024 +0300

    IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242)
---
 .../internal/table/distributed/TableUtils.java     | 28 ++++++++
 .../replicator/PartitionReplicaListener.java       | 14 ++++
 .../replication/PartitionReplicaListenerTest.java  | 78 +++++++++++++++++-----
 3 files changed, 105 insertions(+), 15 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
index 5b0fe734fb..3f1b8ca61e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
 import static org.apache.ignite.internal.util.CollectionUtils.view;
 
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.tx.TransactionIds;
@@ -53,4 +55,30 @@ public class TableUtils {
 
         return view(indexes, CatalogObjectDescriptor::id);
     }
+
+    /**
+     * Returns the catalog version in which the index got status {@link CatalogIndexStatus#BUILDING}.
+     *
+     * @param catalogService Catalog service.
+     * @param indexId Index ID of interest.
+     * @param fromCatalogVersionIncluded Catalog version with which the search will begin (inclusive).
+     */
+    public static int findStartBuildingIndexCatalogVersion(CatalogService catalogService, int indexId, int fromCatalogVersionIncluded) {
+        int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+        for (int catalogVersion = fromCatalogVersionIncluded; catalogVersion <= latestCatalogVersion; catalogVersion++) {
+            CatalogIndexDescriptor index = catalogService.index(indexId, catalogVersion);
+
+            assert index != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion;
+
+            if (index.status() == BUILDING) {
+                return catalogVersion;
+            }
+        }
+
+        throw new AssertionError(String.format(
+                "Could not find index in status %s: [indexId=%s, fromCatalogVersionIncluded=%s, latestCatalogVersion=%s]",
+                BUILDING, indexId, fromCatalogVersionIncluded, latestCatalogVersion
+        ));
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 879c551822..ac157fa270 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,8 +22,10 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.table.distributed.TableUtils.findStartBuildingIndexCatalogVersion;
 import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.beginRwTxTs;
 import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
 import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
@@ -70,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -3805,10 +3808,21 @@ public class PartitionReplicaListener implements ReplicaListener {
 
     private CompletableFuture<?> processBuildIndexReplicaRequest(BuildIndexReplicaRequest request) {
         return txRwOperationTracker.awaitCompleteTxRwOperations(request.creationCatalogVersion())
+                .thenCompose(unused -> safeTime.waitFor(indexStartBuildingActivationTs(request)))
                 .thenCompose(unused -> raftClient.run(toBuildIndexCommand(request)));
     }
 
     private List<Integer> indexIdsAtRwTxBeginTs(UUID txId) {
         return TableUtils.indexIdsAtRwTxBeginTs(catalogService, txId, tableId());
     }
+
+    private HybridTimestamp indexStartBuildingActivationTs(BuildIndexReplicaRequest request) {
+        int catalogVersion = findStartBuildingIndexCatalogVersion(catalogService, request.indexId(), request.creationCatalogVersion());
+
+        Catalog catalog = catalogService.catalog(catalogVersion);
+
+        assert catalog != null : "indexId=" + request.indexId() + ", catalogVersion=" + catalogVersion;
+
+        return hybridTimestamp(catalog.time());
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 67e40a345a..7858d3ccc1 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -21,7 +21,9 @@ import static java.util.Collections.singletonList;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
 import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
@@ -85,6 +87,7 @@ import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.distributed.replicator.action.RequestTypes;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.commands.DefaultValue;
 import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
@@ -2802,7 +2805,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     @ParameterizedTest(name = "readOnly = {0}")
     @ValueSource(booleans = {true, false})
     void testStaleTxOperationAfterIndexStartBuilding(boolean readOnly) {
-        fireHashIndexStartBuildingEventForStaleTxOperation();
+        fireHashIndexStartBuildingEventForStaleTxOperation(hashIndexStorage.id(), 1);
 
         UUID txId = newTxId();
         long beginTs = beginTimestamp(txId).longValue();
@@ -2820,14 +2823,20 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     void testBuildIndexReplicaRequestWithoutRwTxOperations() {
-        CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
+        int indexId = hashIndexStorage.id();
+        int indexCreationCatalogVersion = 1;
+
+        CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(
+                indexId,
+                indexCreationCatalogVersion
+        );
 
         assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
 
-        fireHashIndexStartBuildingEventForStaleTxOperation();
+        fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion);
 
         assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
-        assertThat(invokeBuildIndexReplicaRequestAsync(1, 1), willCompleteSuccessfully());
+        assertThat(invokeBuildIndexReplicaRequestAsync(indexId, indexCreationCatalogVersion), willCompleteSuccessfully());
     }
 
     @ParameterizedTest(name = "failCmd = {0}")
@@ -2853,9 +2862,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         BinaryRow row = binaryRow(0);
 
         CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row, true);
-        CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
 
-        fireHashIndexStartBuildingEventForStaleTxOperation();
+        int indexId = hashIndexStorage.id();
+        int indexCreationCatalogVersion = 1;
+
+        CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(
+                indexId,
+                indexCreationCatalogVersion
+        );
+
+        fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion);
 
         assertFalse(upsertFuture.isDone());
         assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
@@ -2871,22 +2887,54 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         }
 
         assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
+
+        HybridTimestamp startBuildingIndexActivationTs = hybridTimestamp(catalogService.catalog(indexCreationCatalogVersion + 1).time());
+
+        verify(safeTimeClock).waitFor(eq(startBuildingIndexActivationTs));
     }
 
-    private void fireHashIndexStartBuildingEventForStaleTxOperation() {
-        CatalogHashIndexDescriptor hashIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
+    private void fireHashIndexStartBuildingEventForStaleTxOperation(int indexId, int creationIndexCatalogVersion) {
+        var registeredIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
+        var buildingIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
 
-        int indexId = hashIndexStorage.id();
+        when(registeredIndexDescriptor.id()).thenReturn(indexId);
+        when(buildingIndexDescriptor.id()).thenReturn(indexId);
 
-        when(hashIndexDescriptor.id()).thenReturn(indexId);
-        when(hashIndexDescriptor.tableId()).thenReturn(TABLE_ID);
-        when(hashIndexDescriptor.status()).thenReturn(BUILDING);
-        when(hashIndexDescriptor.txWaitCatalogVersion()).thenReturn(1);
+        when(buildingIndexDescriptor.tableId()).thenReturn(TABLE_ID);
+        when(registeredIndexDescriptor.tableId()).thenReturn(TABLE_ID);
 
-        when(catalogService.index(eq(indexId), anyInt())).thenReturn(hashIndexDescriptor);
+        when(registeredIndexDescriptor.status()).thenReturn(REGISTERED);
+        when(buildingIndexDescriptor.status()).thenReturn(BUILDING);
+
+        when(buildingIndexDescriptor.txWaitCatalogVersion()).thenReturn(creationIndexCatalogVersion);
+
+        int startBuildingIndexCatalogVersion = creationIndexCatalogVersion + 1;
+
+        when(catalogService.index(eq(indexId), eq(creationIndexCatalogVersion))).thenReturn(registeredIndexDescriptor);
+        when(catalogService.index(eq(indexId), eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexDescriptor);
+
+        when(catalogService.latestCatalogVersion()).thenReturn(startBuildingIndexCatalogVersion);
+
+        var registeredIndexCatalog = mock(Catalog.class);
+        var buildingIndexCatalog = mock(Catalog.class);
+
+        when(registeredIndexCatalog.version()).thenReturn(creationIndexCatalogVersion);
+        when(buildingIndexCatalog.version()).thenReturn(startBuildingIndexCatalogVersion);
+
+        long registeredIndexActivationTs = clock.now().addPhysicalTime(-100).longValue();
+        long buildingIndexActivationTs = clock.nowLong();
+
+        when(registeredIndexCatalog.time()).thenReturn(registeredIndexActivationTs);
+        when(buildingIndexCatalog.time()).thenReturn(buildingIndexActivationTs);
+
+        when(catalogService.catalog(eq(creationIndexCatalogVersion))).thenReturn(registeredIndexCatalog);
+        when(catalogService.catalog(eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexCatalog);
 
         assertThat(
-                catalogServiceEventProducer.fireEvent(INDEX_BUILDING, new StartBuildingIndexEventParameters(0L, 2, indexId)),
+                catalogServiceEventProducer.fireEvent(
+                        INDEX_BUILDING,
+                        new StartBuildingIndexEventParameters(0L, startBuildingIndexCatalogVersion, indexId)
+                ),
                 willCompleteSuccessfully()
         );
     }