You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/11/23 08:36:08 UTC
(ignite-3) branch main updated: IGNITE-20863 Selecting indexes when performing update operations for partition (#2859)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3f80b7e066 IGNITE-20863 Selecting indexes when performing update operations for partition (#2859)
3f80b7e066 is described below
commit 3f80b7e066f7b0658fb38af7a0ea5957b2de2c64
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Nov 23 11:36:03 2023 +0300
IGNITE-20863 Selecting indexes when performing update operations for partition (#2859)
---
.../internal/catalog/CatalogManagerImpl.java | 5 +
.../ignite/internal/catalog/CatalogService.java | 7 +-
.../apache/ignite/internal/index/IndexChooser.java | 307 +++++++++++++++++++++
.../internal/index/TableIdCatalogVersion.java | 69 +++++
.../ignite/internal/index/IndexChooserTest.java | 287 +++++++++++++++++++
.../internal/index/TestIndexManagementUtils.java | 3 +
6 files changed, 675 insertions(+), 3 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 2903aa8a8a..8d8a5c9e82 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -292,6 +292,11 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata
return catalogAt(timestamp).version();
}
+ @Override
+ public int earliestCatalogVersion() {
+ return catalogByVer.firstEntry().getKey();
+ }
+
@Override
public int latestCatalogVersion() {
return catalogByVer.lastEntry().getKey();
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 11e6528a01..befd21383a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -88,9 +88,10 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent
int activeCatalogVersion(long timestamp);
- /**
- * Returns the latest registered version of the catalog.
- */
+ /** Returns the earliest registered version of the catalog. */
+ int earliestCatalogVersion();
+
+ /** Returns the latest registered version of the catalog. */
int latestCatalogVersion();
/**
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java
new file mode 100644
index 0000000000..29ca18dbd3
--- /dev/null
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index chooser for various operations, for example for RW transactions. */
+class IndexChooser implements ManuallyCloseable {
+ private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = comparingInt(CatalogObjectDescriptor::id);
+
+ private final CatalogService catalogService;
+
+ /**
+ * Map that, for each key, contains a list of all dropped available table indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+ * catalog versions.
+ *
+ * <p>Examples below will be within the same table ID.</p>
+ *
+ * <p>Let's look at an example, let's say we have the following versions of a catalog with indexes:</p>
+ * <pre>
+ * 0: I0(A) I1(A)
+ * 1: IO(A)
+ * 2: I0(A) I2(R) I3(A)
+ * 3: I0(A)
+ * 4: I0(A)
+ * </pre>
+ *
+ * <p>Then the map will have the following values:</p>
+ * <pre>
+ * 1 -> [I1(A)]
+ * 3 -> [I1(A), I3(A)]
+ * </pre>
+ *
+ * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting dropped available indexes}, we will return the following:</p>
+ * <pre>
+ * 0 -> []
+ * 1 -> [I1(A)]
+ * 2 -> [I1(A)]
+ * 3 -> [I1(A), I3(A)]
+ * 4 -> [I1(A), I3(A)]
+ * </pre>
+ *
+ * <p>Updated on {@link #recover() node recovery} and a catalog events processing.</p>
+ */
+ // TODO: IGNITE-20121 We may need to worry about parallel map changes when deleting catalog version
+ // TODO: IGNITE-20934 Worry about cleaning up dropped indexes earlier
+ private final NavigableMap<TableIdCatalogVersion, List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+ = new ConcurrentSkipListMap<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /** Constructor. */
+ IndexChooser(CatalogService catalogService) {
+ this.catalogService = catalogService;
+
+ addListeners();
+ }
+
+ /** Recovers internal structures on node recovery. */
+ void recover() {
+ inBusyLock(busyLock, () -> {
+ // It is expected that the methods will be called only on recovery, when the deploy of metastore watches has not yet occurred.
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+ // At the moment, we will only use tables from the latest version (not dropped), since so far only replicas for them are started
+ // on the node.
+ for (CatalogTableDescriptor table : catalogService.tables(latestCatalogVersion)) {
+ int tableId = table.id();
+
+ for (int catalogVersion = earliestCatalogVersion; catalogVersion < latestCatalogVersion; catalogVersion++) {
+ int nextCatalogVersion = catalogVersion + 1;
+
+ List<CatalogIndexDescriptor> tableIndexes = catalogService.indexes(catalogVersion, tableId);
+
+ if (tableIndexes.isEmpty()) {
+ // Table does not exist yet.
+ continue;
+ }
+
+ List<CatalogIndexDescriptor> nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, tableId);
+
+ assert !nextCatalogVersionTableIndexes.isEmpty()
+ : String.format("Table should not be dropped: [catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
+
+ for (CatalogIndexDescriptor tableIndex : tableIndexes) {
+ if (tableIndex.available() && !contains(nextCatalogVersionTableIndexes, tableIndex)) {
+ addDroppedAvailableIndex(tableIndex, nextCatalogVersion);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ droppedAvailableTableIndexes.clear();
+ }
+
+ /**
+ * Collects a list of table indexes that will need to be used for an update operation in an RW transaction. The list consists of all
+ * indexes (available and registered) on the requested catalog version, as well as all dropped available indexes from previous catalog
+ * versions.
+ *
+ * <p>Returned list is sorted by {@link CatalogObjectDescriptor#id()}. The table is expected to exist in the catalog at the requested
+ * version.</p>
+ *
+ * @param catalogVersion Catalog version.
+ * @param tableId Table ID.
+ */
+ List<CatalogIndexDescriptor> chooseForRwTxUpdateOperation(int catalogVersion, int tableId) {
+ return inBusyLock(busyLock, () -> {
+ List<CatalogIndexDescriptor> tableIndexes = catalogService.indexes(catalogVersion, tableId);
+
+ assert !tableIndexes.isEmpty() : "catalogVersion=" + catalogVersion + ", tableId=" + tableId;
+
+ List<CatalogIndexDescriptor> droppedAvailableTableIndexes = getDroppedAvailableIndexes(catalogVersion, tableId);
+
+ if (droppedAvailableTableIndexes.isEmpty()) {
+ return tableIndexes;
+ }
+
+ return unmodifiableList(merge(tableIndexes, droppedAvailableTableIndexes));
+ });
+ }
+
+ private void addListeners() {
+ catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ return onDropIndex((DropIndexEventParameters) parameters).thenApply(unused -> false);
+ });
+
+ catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ return onDropTable((DropTableEventParameters) parameters).thenApply(unused -> false);
+ });
+ }
+
+ private CompletableFuture<?> onDropIndex(DropIndexEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int previousCatalogVersion = parameters.catalogVersion() - 1;
+
+ CatalogIndexDescriptor droppedIndexDescriptor = catalogService.index(parameters.indexId(), previousCatalogVersion);
+
+ assert droppedIndexDescriptor != null : "indexId=" + parameters.indexId() + ", catalogVersion=" + previousCatalogVersion;
+
+ if (!droppedIndexDescriptor.available()) {
+ return completedFuture(null);
+ }
+
+ addDroppedAvailableIndex(droppedIndexDescriptor, parameters.catalogVersion());
+
+ return completedFuture(null);
+ });
+ }
+
+ private CompletableFuture<?> onDropTable(DropTableEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ // We can remove dropped indexes on table drop as we need such indexes only for writing, and write operations will be denied
+ // right after a table drop has been activated.
+ droppedAvailableTableIndexes.entrySet().removeIf(entry -> parameters.tableId() == entry.getKey().tableId);
+
+ return completedFuture(null);
+ });
+ }
+
+ /**
+ * Returns a list of dropped available indexes (sorted by {@link #INDEX_COMPARATOR}) for the catalog version of interest from
+ * {@link #droppedAvailableTableIndexes}. If there is no list for the requested catalog version, the closest previous catalog version
+ * will be returned.
+ */
+ private List<CatalogIndexDescriptor> getDroppedAvailableIndexes(int catalogVersion, int tableId) {
+ var key = new TableIdCatalogVersion(tableId, catalogVersion);
+
+ Entry<TableIdCatalogVersion, List<CatalogIndexDescriptor>> entry = droppedAvailableTableIndexes.floorEntry(key);
+
+ return entry != null && tableId == entry.getKey().tableId ? entry.getValue() : List.of();
+ }
+
+ /**
+ * Adds the dropped available index to {@link #droppedAvailableTableIndexes}.
+ *
+ * <p>If the list is missing for the catalog version from the arguments, then we create it by merging the indexes from the previous
+ * catalog version and the new index. Otherwise, we simply add to the existing list. Lists are sorted by {@link #INDEX_COMPARATOR}.</p>
+ *
+ * @param droppedIndex Drooped index.
+ * @param catalogVersion Catalog version on which the index was dropped.
+ */
+ private void addDroppedAvailableIndex(CatalogIndexDescriptor droppedIndex, int catalogVersion) {
+ assert droppedIndex.available() : droppedIndex.id();
+
+ int tableId = droppedIndex.tableId();
+
+ // For now, there is no need to worry about parallel changes to the map, it will change on recovery and in catalog event listeners
+ // and won't interfere with each other.
+ // TODO: IGNITE-20121 We may need to worry about parallel map changes when deleting catalog version
+ List<CatalogIndexDescriptor> previousCatalogVersionDroppedIndexes = getDroppedAvailableIndexes(catalogVersion - 1, tableId);
+
+ droppedAvailableTableIndexes.compute(
+ new TableIdCatalogVersion(tableId, catalogVersion),
+ (tableIdCatalogVersion, droppedAvailableIndexes) -> {
+ List<CatalogIndexDescriptor> res;
+
+ if (droppedAvailableIndexes == null) {
+ res = new ArrayList<>(1 + previousCatalogVersionDroppedIndexes.size());
+
+ res.addAll(previousCatalogVersionDroppedIndexes);
+ } else {
+ res = new ArrayList<>(1 + droppedAvailableIndexes.size());
+
+ res.addAll(droppedAvailableIndexes);
+ }
+
+ res.add(droppedIndex);
+
+ res.sort(INDEX_COMPARATOR);
+
+ return unmodifiableList(res);
+ });
+ }
+
+ private static boolean contains(List<CatalogIndexDescriptor> indexes, CatalogIndexDescriptor index) {
+ return binarySearch(indexes, index, INDEX_COMPARATOR) >= 0;
+ }
+
+ private static List<CatalogIndexDescriptor> merge(List<CatalogIndexDescriptor> list0, List<CatalogIndexDescriptor> list1) {
+ assert !list0.isEmpty();
+ assert !list1.isEmpty();
+
+ var res = new ArrayList<CatalogIndexDescriptor>(list0.size() + list1.size());
+
+ for (int i = 0, i0 = 0, i1 = 0; i < list0.size() + list1.size(); i++) {
+ if (i0 >= list0.size()) {
+ res.add(list1.get(i1++));
+ } else if (i1 >= list1.size()) {
+ res.add(list0.get(i0++));
+ } else {
+ CatalogIndexDescriptor index0 = list0.get(i0);
+ CatalogIndexDescriptor index1 = list1.get(i1);
+
+ if (INDEX_COMPARATOR.compare(index0, index1) <= 0) {
+ res.add(index0);
+ i0++;
+ } else {
+ res.add(index1);
+ i1++;
+ }
+ }
+ }
+
+ return res;
+ }
+}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/TableIdCatalogVersion.java b/modules/index/src/main/java/org/apache/ignite/internal/index/TableIdCatalogVersion.java
new file mode 100644
index 0000000000..9bdcc77813
--- /dev/null
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/TableIdCatalogVersion.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import org.apache.ignite.internal.tostring.S;
+
+/** Helper class for storing a pair of table ID and catalog version. */
+class TableIdCatalogVersion implements Comparable<TableIdCatalogVersion> {
+ final int tableId;
+
+ final int catalogVersion;
+
+ TableIdCatalogVersion(int tableId, int catalogVersion) {
+ this.tableId = tableId;
+ this.catalogVersion = catalogVersion;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TableIdCatalogVersion that = (TableIdCatalogVersion) o;
+
+ return tableId == that.tableId && catalogVersion == that.catalogVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableId;
+ result = 31 * result + catalogVersion;
+ return result;
+ }
+
+ @Override
+ public int compareTo(TableIdCatalogVersion o) {
+ int cmp = Integer.compare(tableId, o.tableId);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Integer.compare(catalogVersion, o.catalogVersion);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(TableIdCatalogVersion.class, this);
+ }
+}
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexChooserTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexChooserTest.java
new file mode 100644
index 0000000000..09aff7766c
--- /dev/null
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexChooserTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.PK_INDEX_NAME;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.indexId;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.tableId;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
+import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
+import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** For {@link IndexChooser} testing. */
+public class IndexChooserTest extends BaseIgniteAbstractTest {
+ private final HybridClock clock = new HybridClockImpl();
+
+ private final CatalogManager catalogManager = createTestCatalogManager(NODE_NAME, clock);
+
+ private IndexChooser indexChooser = new IndexChooser(catalogManager);
+
+ private int tableId;
+
+ private int catalogVersionAfterCreateTable;
+
+ @BeforeEach
+ void setUp() {
+ catalogManager.start();
+
+ createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
+
+ tableId = tableId(catalogManager, TABLE_NAME, clock);
+
+ catalogVersionAfterCreateTable = catalogManager.latestCatalogVersion();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(catalogManager::stop, indexChooser::close);
+ }
+
+ @ParameterizedTest(name = "withRecovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void testChooseForRwTxOperationAfterCreateTable(boolean withRecovery) {
+ int catalogVersion = catalogVersionAfterCreateTable;
+
+ if (withRecovery) {
+ recoverIndexCollector();
+ }
+
+ assertThat(
+ chooseForRwTxOperation(catalogVersion),
+ contains(index(catalogVersion, PK_INDEX_NAME))
+ );
+ }
+
+ @ParameterizedTest(name = "withRecovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void testChooseForRwTxOperationAfterCreateIndex(boolean withRecovery) {
+ createIndex(INDEX_NAME);
+
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ if (withRecovery) {
+ recoverIndexCollector();
+ }
+
+ assertThat(
+ chooseForRwTxOperation(catalogVersion),
+ contains(index(catalogVersion, PK_INDEX_NAME), index(catalogVersion, INDEX_NAME))
+ );
+ }
+
+ @ParameterizedTest(name = "withRecovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void testChooseForRwTxOperationAfterMakeIndexAvailable(boolean withRecovery) {
+ createIndex(INDEX_NAME);
+ makeIndexAvailable(INDEX_NAME);
+
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ if (withRecovery) {
+ recoverIndexCollector();
+ }
+
+ assertThat(
+ chooseForRwTxOperation(catalogVersion),
+ contains(index(catalogVersion, PK_INDEX_NAME), index(catalogVersion, INDEX_NAME))
+ );
+ }
+
+ @ParameterizedTest(name = "withRecovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void testChooseForRwTxOperationAfterDropRegisteredIndex(boolean withRecovery) {
+ createIndex(INDEX_NAME);
+ dropIndex(INDEX_NAME);
+
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ if (withRecovery) {
+ recoverIndexCollector();
+ }
+
+ assertThat(
+ chooseForRwTxOperation(catalogManager.latestCatalogVersion()),
+ contains(index(catalogVersion, PK_INDEX_NAME))
+ );
+ }
+
+ @ParameterizedTest(name = "withRecovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void testChooseForRwTxOperationAfterDropAvailableIndex(boolean withRecovery) {
+ createIndex(INDEX_NAME);
+ makeIndexAvailable(INDEX_NAME);
+
+ int catalogVersionAfterMakeIndexAvailable = catalogManager.latestCatalogVersion();
+
+ dropIndex(INDEX_NAME);
+
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ if (withRecovery) {
+ recoverIndexCollector();
+ }
+
+ assertThat(
+ chooseForRwTxOperation(catalogVersion),
+ contains(index(catalogVersion, PK_INDEX_NAME), index(catalogVersionAfterMakeIndexAvailable, INDEX_NAME))
+ );
+ }
+
+ @ParameterizedTest(name = "withRecovery = {0}")
+ @ValueSource(booleans = {false, true})
+ void testChooseForRwTxOperationComplexCase(boolean withRecovery) {
+ String indexName1 = INDEX_NAME + 1;
+ String indexName2 = INDEX_NAME + 2;
+ String indexName3 = INDEX_NAME + 3;
+ String indexName4 = INDEX_NAME + 4;
+ String indexName5 = INDEX_NAME + 4;
+
+ // after execute: I0(A) I1(R) I2(R)
+ executeCatalogCommands(toCreateHashIndexCommand(indexName1), toCreateHashIndexCommand(indexName2));
+
+ // after execute: I0(A) I1(A)
+ executeCatalogCommands(toMakeAvailableIndexCommand(indexName1), toDropIndexCommand(indexName2));
+
+ // after execute: I0(A) I1(A) I3(R) I4(R)
+ executeCatalogCommands(toCreateHashIndexCommand(indexName3), toCreateHashIndexCommand(indexName4));
+
+ // after execute: I0(A) I1(A) I3(A) I4(A)
+ executeCatalogCommands(toMakeAvailableIndexCommand(indexName3), toMakeAvailableIndexCommand(indexName4));
+
+ int catalogVersionBeforeDropIndex3And4 = catalogManager.latestCatalogVersion();
+
+ // after execute: I0(A) I1(A)
+ executeCatalogCommands(toDropIndexCommand(indexName4), toDropIndexCommand(indexName3));
+
+ int catalogVersionBeforeDropIndex1 = catalogManager.latestCatalogVersion();
+
+ // after execute: I0(A)
+ executeCatalogCommands(toDropIndexCommand(indexName1));
+
+ // after execute: I0(A) I5(R)
+ executeCatalogCommands(toCreateHashIndexCommand(indexName5));
+
+ // Let's check.
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ if (withRecovery) {
+ recoverIndexCollector();
+ }
+
+ assertThat(
+ chooseForRwTxOperation(catalogVersion),
+ contains(
+ index(catalogVersion, PK_INDEX_NAME), // Alive available index0 (pk)
+ index(catalogVersionBeforeDropIndex1, indexName1), // Dropped available index1
+ index(catalogVersionBeforeDropIndex3And4, indexName3), // Dropped available index3
+ index(catalogVersionBeforeDropIndex3And4, indexName4), // Dropped available index4
+ index(catalogVersion, indexName5) // Alive registered index5
+ )
+ );
+ }
+
+ private void createIndex(String indexName) {
+ TestIndexManagementUtils.createIndex(catalogManager, TABLE_NAME, indexName, COLUMN_NAME);
+ }
+
+ private void makeIndexAvailable(String indexName) {
+ int indexId = indexId(catalogManager, indexName, clock);
+
+ TestIndexManagementUtils.makeIndexAvailable(catalogManager, indexId);
+ }
+
+ private void dropIndex(String indexName) {
+ TestIndexManagementUtils.dropIndex(catalogManager, indexName);
+ }
+
+ private List<CatalogIndexDescriptor> chooseForRwTxOperation(int catalogVersion) {
+ return indexChooser.chooseForRwTxUpdateOperation(catalogVersion, tableId);
+ }
+
+ private void executeCatalogCommands(CatalogCommand... commands) {
+ assertThat(catalogManager.execute(List.of(commands)), willCompleteSuccessfully());
+ }
+
+ private CatalogIndexDescriptor index(int catalogVersion, String indexName) {
+ CatalogIndexDescriptor res = catalogManager.indexes(catalogVersion, tableId).stream()
+ .filter(index -> indexName.equals(index.name()))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(res, "catalogVersion=" + catalogVersion + ", indexName=" + indexName);
+
+ return res;
+ }
+
+ private CatalogCommand toMakeAvailableIndexCommand(String indexName) {
+ int indexId = indexId(catalogManager, indexName, clock);
+
+ return MakeIndexAvailableCommand.builder()
+ .indexId(indexId)
+ .build();
+ }
+
+ private void recoverIndexCollector() {
+ indexChooser.close();
+
+ indexChooser = new IndexChooser(catalogManager);
+
+ indexChooser.recover();
+ }
+
+ private static CatalogCommand toCreateHashIndexCommand(String indexName) {
+ return CreateHashIndexCommand.builder()
+ .schemaName(DEFAULT_SCHEMA_NAME)
+ .tableName(TABLE_NAME)
+ .indexName(indexName)
+ .columns(List.of(COLUMN_NAME))
+ .unique(false)
+ .build();
+ }
+
+ private static CatalogCommand toDropIndexCommand(String indexName) {
+ return DropIndexCommand.builder()
+ .schemaName(DEFAULT_SCHEMA_NAME)
+ .indexName(indexName)
+ .build();
+ }
+}
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
index e352801dfe..4581daeaa8 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.index;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -63,6 +64,8 @@ class TestIndexManagementUtils {
static final String INDEX_NAME = "test-index";
+ static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
static final ClusterNode LOCAL_NODE = new ClusterNodeImpl(NODE_ID, NODE_NAME, mock(NetworkAddress.class));
static void createTable(CatalogManager catalogManager, String tableName, String columnName) {