You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/01 14:26:38 UTC

(ignite-3) branch main updated: IGNITE-20119 Switch index to STOPPING state as a reaction to DROP INDEX (#3124)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a92578cff5 IGNITE-20119 Switch index to STOPPING state as a reaction to DROP INDEX (#3124)
a92578cff5 is described below

commit a92578cff5b5add99dc8442250ab79b242fa7196
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Feb 1 18:26:32 2024 +0400

    IGNITE-20119 Switch index to STOPPING state as a reaction to DROP INDEX (#3124)
---
 .../catalog/commands/DropIndexCommand.java         |  24 ++-
 .../catalog/commands/DropIndexCommandBuilder.java  |   2 +-
 .../catalog/commands/DropTableCommand.java         |   9 +-
 .../catalog/commands/RemoveIndexCommand.java       |  85 +++++++++
 ...Builder.java => RemoveIndexCommandBuilder.java} |  17 +-
 .../descriptors/CatalogHashIndexDescriptor.java    |   2 -
 .../catalog/descriptors/CatalogIndexStatus.java    |  19 +-
 .../internal/catalog/events/CatalogEvent.java      |  17 +-
 ...meters.java => RemoveIndexEventParameters.java} |  23 +--
 ...ters.java => StoppingIndexEventParameters.java} |  10 +-
 .../storage/AbstractChangeIndexStatusEntry.java    |   4 +-
 .../internal/catalog/storage/DropIndexEntry.java   |  49 +----
 .../{DropIndexEntry.java => RemoveIndexEntry.java} |  37 +---
 .../internal/catalog/CatalogManagerSelfTest.java   | 209 ++++++++++++++++-----
 ...ractChangeIndexStatusCommandValidationTest.java |  48 ++---
 .../commands/AbstractCommandValidationTest.java    |  21 +++
 .../catalog/commands/CatalogUtilsTest.java         |  60 ++++--
 .../commands/RemoveIndexCommandValidationTest.java |  47 +++++
 .../ignite/internal/catalog/CatalogTestUtils.java  |  26 ++-
 .../index/IndexAvailabilityController.java         |  25 ++-
 .../internal/index/IndexBuildController.java       |  10 +-
 .../index/IndexBuildingStarterController.java      |  12 +-
 .../apache/ignite/internal/index/IndexChooser.java |  87 ++++-----
 .../internal/index/IndexManagementUtils.java       |  18 +-
 .../apache/ignite/internal/index/IndexManager.java |  51 ++++-
 .../ignite/internal/index/IndexChooserTest.java    |  54 +++++-
 .../ignite/internal/index/IndexManagerTest.java    |  12 +-
 .../internal/index/TestIndexManagementUtils.java   |   4 +
 .../ignite/internal/table/TableTestUtils.java      |  16 ++
 29 files changed, 710 insertions(+), 288 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
index f147883d04..9e9c52045e 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
@@ -26,13 +26,23 @@ import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogValidationException;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.storage.DropIndexEntry;
+import org.apache.ignite.internal.catalog.storage.RemoveIndexEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateEntry;
 
 /**
  * A command that drops index with specified name.
+ *
+ * <ul>
+ *     <li>If the index never was {@link CatalogIndexStatus#AVAILABLE}, it removes it from the Catalog right away.</li>
+ *     <li>If it is currently {@link CatalogIndexStatus#AVAILABLE}, moves it to {@link CatalogIndexStatus#STOPPING}</li>
+ *     <li>If it is already {@link CatalogIndexStatus#STOPPING}, fails (as the index is already dropped).</li>
+ * </ul>
+ *
+ * <p>Not to be confused with {@link RemoveIndexCommand}.
  */
 public class DropIndexCommand extends AbstractIndexCommand {
     /** Returns builder to create a command to drop index with specified name. */
@@ -66,9 +76,17 @@ public class DropIndexCommand extends AbstractIndexCommand {
             throw new CatalogValidationException("Dropping primary key index is not allowed");
         }
 
-        return List.of(
-                new DropIndexEntry(index.id(), index.tableId(), schemaName)
-        );
+        switch (index.status()) {
+            case REGISTERED:
+            case BUILDING:
+                return List.of(new RemoveIndexEntry(index.id()));
+            case AVAILABLE:
+                return List.of(new DropIndexEntry(index.id(), index.tableId()));
+            case STOPPING:
+                throw new CatalogValidationException("Dropping an already dropped index is not allowed");
+            default:
+                throw new IllegalStateException("Unknown index status: " + index.status());
+        }
     }
 
     private static class Builder implements DropIndexCommandBuilder {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandBuilder.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandBuilder.java
index fae04b24a6..51849afa09 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandBuilder.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandBuilder.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.catalog.commands;
 
 /**
- * Builder of a command that drop specified index.
+ * Builder of a command that drops specified index.
  *
  * <p>A builder is considered to be reusable, thus implementation have
  * to make sure invocation of {@link #build()} method doesn't cause any
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
index 6fde75117d..b5365fd244 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
@@ -28,8 +28,8 @@ import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogValidationException;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.storage.DropIndexEntry;
 import org.apache.ignite.internal.catalog.storage.DropTableEntry;
+import org.apache.ignite.internal.catalog.storage.RemoveIndexEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateEntry;
 
 /**
@@ -55,7 +55,12 @@ public class DropTableCommand extends AbstractTableCommand {
 
         Arrays.stream(schema.indexes())
                 .filter(index -> index.tableId() == table.id())
-                .forEach(index -> updateEntries.add(new DropIndexEntry(index.id(), index.tableId(), schemaName)));
+                .forEach(index -> {
+                    // We can remove AVAILABLE/STOPPED index right away as the only reason to have an index in the STOPPING state is to
+                    // allow RW transactions started before the index drop to write to it, but as the table is already dropped,
+                    // the writes are not possible in any case.
+                    updateEntries.add(new RemoveIndexEntry(index.id()));
+                });
 
         updateEntries.add(new DropTableEntry(table.id(), schemaName));
 
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommand.java
new file mode 100644
index 0000000000..2e34f847b7
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.indexOrThrow;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.storage.RemoveIndexEntry;
+import org.apache.ignite.internal.catalog.storage.UpdateEntry;
+
+/**
+ * A command that removes index with specified ID from the Catalog. Only makes sense for an index in the {@link CatalogIndexStatus#STOPPING}
+ * state (so it's about removing a dropped index from the Catalog when we don't need it anymore).
+ *
+ * <p>This is always invoked as a reaction to an internal trigger, not directly by the end user.
+ *
+ * <p>For dropping an index, please refer to {@link DropIndexCommand}.
+ *
+ * @see DropIndexCommand
+ */
+public class RemoveIndexCommand implements CatalogCommand {
+    /** Returns builder to create a command to remove index with specified name. */
+    public static RemoveIndexCommandBuilder builder() {
+        return new Builder();
+    }
+
+    private final int indexId;
+
+    /**
+     * Constructor.
+     *
+     * @param indexId ID of the index to remove.
+     */
+    private RemoveIndexCommand(int indexId) {
+        this.indexId = indexId;
+    }
+
+    @Override
+    public List<UpdateEntry> get(Catalog catalog) {
+        CatalogIndexDescriptor index = indexOrThrow(catalog, indexId);
+
+        if (index.status() != STOPPING) {
+            throw new CatalogValidationException("Cannot remove index {} because its status is {}", indexId, index.status());
+        }
+
+        return List.of(new RemoveIndexEntry(indexId));
+    }
+
+    private static class Builder implements RemoveIndexCommandBuilder {
+        private int indexId;
+
+        @Override
+        public Builder indexId(int indexId) {
+            this.indexId = indexId;
+
+            return this;
+        }
+
+        @Override
+        public CatalogCommand build() {
+            return new RemoveIndexCommand(indexId);
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandBuilder.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandBuilder.java
similarity index 68%
copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandBuilder.java
copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandBuilder.java
index fae04b24a6..d6bc39382b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandBuilder.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandBuilder.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.internal.catalog.commands;
 
-/**
- * Builder of a command that drop specified index.
- *
- * <p>A builder is considered to be reusable, thus implementation have
- * to make sure invocation of {@link #build()} method doesn't cause any
- * side effects on builder's state or any object created by the same builder.
- */
-public interface DropIndexCommandBuilder extends AbstractIndexCommandBuilder<DropIndexCommandBuilder> {
+import org.apache.ignite.internal.catalog.CatalogCommand;
+
+/** Builder for {@link RemoveIndexCommand}. */
+public interface RemoveIndexCommandBuilder {
+    /** Index ID. */
+    RemoveIndexCommandBuilder indexId(int indexId);
+
+    /** Builds a new command with specified parameters. */
+    CatalogCommand build();
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java
index 05ccbad711..8e7b0c6b45 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java
@@ -80,5 +80,3 @@ public class CatalogHashIndexDescriptor extends CatalogIndexDescriptor {
         return S.toString(this);
     }
 }
-
-
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexStatus.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexStatus.java
index 5706881efd..adfcaff116 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexStatus.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexStatus.java
@@ -22,8 +22,12 @@ package org.apache.ignite.internal.catalog.descriptors;
  *
  * <p>Possible status transitions:</p>
  * <ul>
- *     <li>{@link #REGISTERED} -> {@link #BUILDING} -> {@link #AVAILABLE}.</li>
- *     <li>{@link #AVAILABLE} (PK index).</li>
+ *     <li>[not-existent] -> {@link #REGISTERED} -> {@link #BUILDING} -> {@link #AVAILABLE}.</li>
+ *     <li>[not-existent] -> {@link #AVAILABLE} (PK index).</li>
+ *     <li>{@link #AVAILABLE} -> {@link #STOPPING} -> [removed]. (when dropping an index, but not its table)</li>
+ *     <li>{@link #AVAILABLE} -> [removed]. (when dropping the table of an index)</li>
+ *     <li>{@link #REGISTERED} -> [removed].</li>
+ *     <li>{@link #BUILDING} -> [removed].</li>
  * </ul>
  */
 public enum CatalogIndexStatus {
@@ -46,5 +50,14 @@ public enum CatalogIndexStatus {
      *
      * <p>Readable and writable.</p>
      */
-    AVAILABLE
+    AVAILABLE,
+
+    /**
+     * DROP INDEX command has been executed, the index is waiting for RW transactions started when the index was {@link #AVAILABLE}
+     * to finish. After the wait is finished, the index will automatically be removed from the Catalog.
+     *
+     * <p>New RW transactions cannot read the index, but they write to it. RO transactions can still read from it if the readTimestamp
+     * corresponds to a moment when the index was still {@link #AVAILABLE}.</p>
+     */
+    STOPPING
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
index 77558e0365..2c65d1f516 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.catalog.events;
 
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.event.Event;
 
 /**
@@ -35,15 +36,25 @@ public enum CatalogEvent implements Event {
     /** This event is fired, when an index was created in Catalog. */
     INDEX_CREATE,
 
-    /** This event is fired, when an index was dropped in Catalog. */
-    INDEX_DROP,
-
     /** This event is fired when the index is ready to start building. */
     INDEX_BUILDING,
 
     /** This event is fired when the index becomes available, i.e. the index has been built. */
     INDEX_AVAILABLE,
 
+    /**
+     * This event is fired when an {@link CatalogIndexStatus#AVAILABLE} index was dropped in the Catalog (so it's switched to
+     * the {@link CatalogIndexStatus#STOPPING} state), but not its table.
+     */
+    INDEX_STOPPING,
+
+    /**
+     *  Fired when an index is removed from the Catalog. This happens when an index that never was {@link CatalogIndexStatus#AVAILABLE}
+     *  gets dropped, or when an index that is {@link CatalogIndexStatus#STOPPING} is finished with and we don't need to keep it in
+     *  the Catalog anymore, or when an index gets dropped because its table gets dropped.
+     */
+    INDEX_REMOVED,
+
     /** This event is fired, when a distribution zone was created in Catalog. */
     ZONE_CREATE,
 
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/RemoveIndexEventParameters.java
similarity index 66%
copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/RemoveIndexEventParameters.java
index 96b0d16942..d299b04e05 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/RemoveIndexEventParameters.java
@@ -17,37 +17,30 @@
 
 package org.apache.ignite.internal.catalog.events;
 
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+
 /**
- * Drop index event parameters that contains an id of dropped index.
+ * Event parameters for an 'index removed from the Catalog' event (don't confuse it with {@link StoppingIndexEventParameters}
+ * that is about switching the index to the {@link CatalogIndexStatus#STOPPING} state).
  */
-public class DropIndexEventParameters extends CatalogEventParameters {
-
+public class RemoveIndexEventParameters extends CatalogEventParameters {
     private final int indexId;
 
-    private final int tableId;
-
     /**
      * Constructor.
      *
      * @param causalityToken Causality token.
      * @param catalogVersion Catalog version.
-     * @param indexId An id of dropped index.
-     * @param tableId Table ID for which the index was removed.
+     * @param indexId Index ID.
      */
-    public DropIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) {
+    public RemoveIndexEventParameters(long causalityToken, int catalogVersion, int indexId) {
         super(causalityToken, catalogVersion);
 
         this.indexId = indexId;
-        this.tableId = tableId;
     }
 
-    /** Returns an id of dropped index. */
+    /** Returns index ID. */
     public int indexId() {
         return indexId;
     }
-
-    /** Returns table ID for which the index was removed. */
-    public int tableId() {
-        return tableId;
-    }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
similarity index 77%
rename from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
rename to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
index 96b0d16942..51e1536a26 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
@@ -17,10 +17,14 @@
 
 package org.apache.ignite.internal.catalog.events;
 
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+
 /**
- * Drop index event parameters that contains an id of dropped index.
+ * Event parameters for the 'index has moved to the {@link CatalogIndexStatus#STOPPING} that contains an id of the dropped index.
+ *
+ * @see CatalogEvent#INDEX_STOPPING
  */
-public class DropIndexEventParameters extends CatalogEventParameters {
+public class StoppingIndexEventParameters extends CatalogEventParameters {
 
     private final int indexId;
 
@@ -34,7 +38,7 @@ public class DropIndexEventParameters extends CatalogEventParameters {
      * @param indexId An id of dropped index.
      * @param tableId Table ID for which the index was removed.
      */
-    public DropIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) {
+    public StoppingIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) {
         super(causalityToken, catalogVersion);
 
         this.indexId = indexId;
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
index 3a384df0f5..1199372aa6 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
@@ -43,7 +43,7 @@ abstract class AbstractChangeIndexStatusEntry implements UpdateEntry {
     }
 
     @Override
-    public Catalog applyUpdate(Catalog catalog, long causalityToken) {
+    public final Catalog applyUpdate(Catalog catalog, long causalityToken) {
         CatalogSchemaDescriptor schema = schemaByIndexId(catalog, indexId);
 
         return new Catalog(
@@ -64,7 +64,7 @@ abstract class AbstractChangeIndexStatusEntry implements UpdateEntry {
         );
     }
 
-    private static CatalogSchemaDescriptor schemaByIndexId(Catalog catalog, int indexId) {
+    static CatalogSchemaDescriptor schemaByIndexId(Catalog catalog, int indexId) {
         CatalogIndexDescriptor index = catalog.index(indexId);
 
         assert index != null : indexId;
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 564327f9e8..c31631a54b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -17,40 +17,31 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
-import java.util.Arrays;
-import java.util.Objects;
-import org.apache.ignite.internal.catalog.Catalog;
-import org.apache.ignite.internal.catalog.commands.CatalogUtils;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
- * Describes deletion of an index.
+ * Describes drop of an index (it's not the final removal of an index from the Catalog, but it's just a switch to
+ * the {@link CatalogIndexStatus#STOPPING} state.
  */
-public class DropIndexEntry implements UpdateEntry, Fireable {
+public class DropIndexEntry extends AbstractChangeIndexStatusEntry implements Fireable {
     private static final long serialVersionUID = -604729846502020728L;
 
-    private final int indexId;
-
     private final int tableId;
 
-    private final String schemaName;
-
     /**
      * Constructs the object.
      *
      * @param indexId An id of an index to drop.
      * @param tableId Table ID for which the index was removed.
-     * @param schemaName Schema name.
      */
-    public DropIndexEntry(int indexId, int tableId, String schemaName) {
-        this.indexId = indexId;
+    public DropIndexEntry(int indexId, int tableId) {
+        super(indexId, CatalogIndexStatus.STOPPING);
+
         this.tableId = tableId;
-        this.schemaName = schemaName;
     }
 
     /** Returns an id of an index to drop. */
@@ -65,32 +56,12 @@ public class DropIndexEntry implements UpdateEntry, Fireable {
 
     @Override
     public CatalogEvent eventType() {
-        return CatalogEvent.INDEX_DROP;
+        return CatalogEvent.INDEX_STOPPING;
     }
 
     @Override
     public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) {
-        return new DropIndexEventParameters(causalityToken, catalogVersion, indexId, tableId);
-    }
-
-    @Override
-    public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
-
-        return new Catalog(
-                catalog.version(),
-                catalog.time(),
-                catalog.objectIdGenState(),
-                catalog.zones(),
-                CatalogUtils.replaceSchema(new CatalogSchemaDescriptor(
-                        schema.id(),
-                        schema.name(),
-                        schema.tables(),
-                        Arrays.stream(schema.indexes()).filter(t -> t.id() != indexId).toArray(CatalogIndexDescriptor[]::new),
-                        schema.systemViews(),
-                        causalityToken
-                ), catalog.schemas())
-        );
+        return new StoppingIndexEventParameters(causalityToken, catalogVersion, indexId, tableId);
     }
 
     @Override
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RemoveIndexEntry.java
similarity index 70%
copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RemoveIndexEntry.java
index 564327f9e8..a606b17b4c 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RemoveIndexEntry.java
@@ -17,65 +17,48 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static org.apache.ignite.internal.catalog.storage.AbstractChangeIndexStatusEntry.schemaByIndexId;
+
 import java.util.Arrays;
-import java.util.Objects;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
- * Describes deletion of an index.
+ * Describes removal of an index from the Catalog (not the same as dropping it [that just initates the drop sequence]).
  */
-public class DropIndexEntry implements UpdateEntry, Fireable {
-    private static final long serialVersionUID = -604729846502020728L;
+public class RemoveIndexEntry implements UpdateEntry, Fireable {
+    private static final long serialVersionUID = -6193287919562795933L;
 
     private final int indexId;
 
-    private final int tableId;
-
-    private final String schemaName;
-
     /**
      * Constructs the object.
      *
      * @param indexId An id of an index to drop.
-     * @param tableId Table ID for which the index was removed.
-     * @param schemaName Schema name.
      */
-    public DropIndexEntry(int indexId, int tableId, String schemaName) {
+    public RemoveIndexEntry(int indexId) {
         this.indexId = indexId;
-        this.tableId = tableId;
-        this.schemaName = schemaName;
-    }
-
-    /** Returns an id of an index to drop. */
-    public int indexId() {
-        return indexId;
-    }
-
-    /** Returns table ID for which the index was removed. */
-    public int tableId() {
-        return tableId;
     }
 
     @Override
     public CatalogEvent eventType() {
-        return CatalogEvent.INDEX_DROP;
+        return CatalogEvent.INDEX_REMOVED;
     }
 
     @Override
     public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) {
-        return new DropIndexEventParameters(causalityToken, catalogVersion, indexId, tableId);
+        return new RemoveIndexEventParameters(causalityToken, catalogVersion, indexId);
     }
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
+        CatalogSchemaDescriptor schema = schemaByIndexId(catalog, indexId);
 
         return new Catalog(
                 catalog.version(),
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index c3428141e0..7600345f5c 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -46,6 +46,7 @@ import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollat
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
@@ -78,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.clearInvocations;
@@ -110,11 +112,13 @@ import org.apache.ignite.internal.catalog.commands.DefaultValue;
 import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
 import org.apache.ignite.internal.catalog.commands.DropZoneCommand;
 import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
 import org.apache.ignite.internal.catalog.commands.RenameTableCommand;
 import org.apache.ignite.internal.catalog.commands.RenameZoneCommand;
 import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
 import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor;
@@ -128,12 +132,13 @@ import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
 import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
 import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
 import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
 import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateLog;
 import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
@@ -374,9 +379,9 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
         assertNull(manager.table(TABLE_NAME, clock.nowLong()));
         assertNull(manager.table(table1.id(), clock.nowLong()));
 
-        assertNull(schema.index(pkIndexName(TABLE_NAME)));
-        assertNull(manager.index(pkIndexName(TABLE_NAME), clock.nowLong()));
-        assertNull(manager.index(pkIndex1.id(), clock.nowLong()));
+        assertThat(schema.index(pkIndexName(TABLE_NAME)), is(nullValue()));
+        assertThat(manager.index(pkIndexName(TABLE_NAME), clock.nowLong()), is(nullValue()));
+        assertThat(manager.index(pkIndex1.id(), clock.nowLong()), is(nullValue()));
 
         assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong()));
         assertSame(table2, manager.table(table2.id(), clock.nowLong()));
@@ -939,14 +944,17 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
     @Test
     public void testDropTableWithIndex() {
         assertThat(manager.execute(simpleTable(TABLE_NAME)), willBe(nullValue()));
-        assertThat(manager.execute(simpleIndex()), willBe(nullValue()));
+        assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME)), willBe(nullValue()));
+        startBuildingIndex(indexId(INDEX_NAME));
+        makeIndexAvailable(indexId(INDEX_NAME));
 
         long beforeDropTimestamp = clock.nowLong();
+        int beforeDropVersion = manager.latestCatalogVersion();
 
         assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = manager.schema(2);
+        CatalogSchemaDescriptor schema = manager.schema(beforeDropVersion);
         CatalogTableDescriptor table = schema.table(TABLE_NAME);
         CatalogIndexDescriptor index = schema.index(INDEX_NAME);
 
@@ -961,7 +969,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
         assertSame(index, manager.index(index.id(), beforeDropTimestamp));
 
         // Validate actual catalog
-        schema = manager.schema(3);
+        schema = manager.schema(manager.latestCatalogVersion());
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
@@ -971,9 +979,9 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
         assertNull(manager.table(TABLE_NAME, clock.nowLong()));
         assertNull(manager.table(table.id(), clock.nowLong()));
 
-        assertNull(schema.index(INDEX_NAME));
-        assertNull(manager.index(INDEX_NAME, clock.nowLong()));
-        assertNull(manager.index(index.id(), clock.nowLong()));
+        assertThat(schema.index(INDEX_NAME), is(nullValue()));
+        assertThat(manager.index(INDEX_NAME, clock.nowLong()), is(nullValue()));
+        assertThat(manager.index(index.id(), clock.nowLong()), is(nullValue()));
     }
 
     @Test
@@ -1158,7 +1166,10 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
         when(eventListener.notify(any(), any())).thenReturn(falseCompletedFuture());
 
         manager.listen(CatalogEvent.INDEX_CREATE, eventListener);
-        manager.listen(CatalogEvent.INDEX_DROP, eventListener);
+        manager.listen(CatalogEvent.INDEX_BUILDING, eventListener);
+        manager.listen(CatalogEvent.INDEX_AVAILABLE, eventListener);
+        manager.listen(CatalogEvent.INDEX_STOPPING, eventListener);
+        manager.listen(CatalogEvent.INDEX_REMOVED, eventListener);
 
         // Try to create index without table.
         assertThat(manager.execute(createIndexCmd), willThrow(TableNotFoundValidationException.class));
@@ -1174,11 +1185,21 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
         assertThat(manager.execute(createIndexCmd), willCompleteSuccessfully());
         verify(eventListener).notify(any(CreateIndexEventParameters.class), isNull());
 
+        startBuildingIndex(indexId(INDEX_NAME));
+        verify(eventListener).notify(any(StartBuildingIndexEventParameters.class), isNull());
+
+        makeIndexAvailable(indexId(INDEX_NAME));
+        verify(eventListener).notify(any(MakeIndexAvailableEventParameters.class), isNull());
+
         clearInvocations(eventListener);
 
         // Drop index.
         assertThat(manager.execute(dropIndexCmd), willBe(nullValue()));
-        verify(eventListener).notify(any(DropIndexEventParameters.class), isNull());
+        verify(eventListener).notify(any(StoppingIndexEventParameters.class), isNull());
+
+        // Remove index.
+        removeIndex(indexId(INDEX_NAME));
+        verify(eventListener).notify(any(RemoveIndexEventParameters.class), isNull());
 
         clearInvocations(eventListener);
 
@@ -1188,7 +1209,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
         // Try drop index once again.
         assertThat(manager.execute(dropIndexCmd), willThrow(IndexNotFoundValidationException.class));
 
-        verify(eventListener).notify(any(DropIndexEventParameters.class), isNull());
+        verify(eventListener).notify(any(RemoveIndexEventParameters.class), isNull());
     }
 
     @Test
@@ -1556,39 +1577,46 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
 
         assertThat(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL"))), willBe(nullValue()));
 
+        int indexId = manager.index(INDEX_NAME, clock.nowLong()).id();
+
+        startBuildingIndex(indexId);
+        makeIndexAvailable(indexId);
+
         int tableId = manager.table(TABLE_NAME, clock.nowLong()).id();
         int pkIndexId = manager.index(pkIndexName(TABLE_NAME), clock.nowLong()).id();
-        int indexId = manager.index(INDEX_NAME, clock.nowLong()).id();
 
         assertNotEquals(tableId, indexId);
 
-        EventListener<CatalogEventParameters> eventListener = mock(EventListener.class);
+        EventListener<StoppingIndexEventParameters> stoppingListener = mock(EventListener.class);
+        EventListener<RemoveIndexEventParameters> removedListener = mock(EventListener.class);
 
-        ArgumentCaptor<DropIndexEventParameters> captor = ArgumentCaptor.forClass(DropIndexEventParameters.class);
+        ArgumentCaptor<StoppingIndexEventParameters> stoppingCaptor = ArgumentCaptor.forClass(StoppingIndexEventParameters.class);
+        ArgumentCaptor<RemoveIndexEventParameters> removingCaptor = ArgumentCaptor.forClass(RemoveIndexEventParameters.class);
 
-        doReturn(falseCompletedFuture()).when(eventListener).notify(captor.capture(), any());
+        doReturn(falseCompletedFuture()).when(stoppingListener).notify(stoppingCaptor.capture(), any());
+        doReturn(falseCompletedFuture()).when(removedListener).notify(removingCaptor.capture(), any());
 
-        manager.listen(CatalogEvent.INDEX_DROP, eventListener);
+        manager.listen(CatalogEvent.INDEX_STOPPING, stoppingListener);
+        manager.listen(CatalogEvent.INDEX_REMOVED, removedListener);
 
-        // Let's remove the index.
+        // Let's drop the index.
         assertThat(
                 manager.execute(DropIndexCommand.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()),
                 willBe(nullValue())
         );
 
-        DropIndexEventParameters eventParameters = captor.getValue();
+        StoppingIndexEventParameters stoppingEventParameters = stoppingCaptor.getValue();
 
-        assertEquals(indexId, eventParameters.indexId());
-        assertEquals(tableId, eventParameters.tableId());
+        assertEquals(indexId, stoppingEventParameters.indexId());
+        assertEquals(tableId, stoppingEventParameters.tableId());
 
-        // Let's delete the table.
+        // Let's drop the table.
         assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willBe(nullValue()));
 
-        // Let's make sure that the PK index has been deleted.
-        eventParameters = captor.getValue();
+        // Let's make sure that the PK index has been removed.
+        RemoveIndexEventParameters pkRemovedEventParameters = removingCaptor.getAllValues().get(0);
 
-        assertEquals(pkIndexId, eventParameters.indexId());
-        assertEquals(tableId, eventParameters.tableId());
+        assertEquals(pkIndexId, pkRemovedEventParameters.indexId());
     }
 
     @Test
@@ -1797,6 +1825,107 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
         );
     }
 
+    @Test
+    void droppingAnAvailableIndexMovesItToStoppingState() {
+        createSomeTable(TABLE_NAME);
+        createSomeIndex(TABLE_NAME, INDEX_NAME);
+        startBuildingIndex(indexId(INDEX_NAME));
+        makeIndexAvailable(indexId(INDEX_NAME));
+
+        dropIndex(INDEX_NAME);
+
+        CatalogIndexDescriptor index = index(manager.latestCatalogVersion(), INDEX_NAME);
+
+        assertThat(index, is(notNullValue()));
+        assertThat(index.status(), is(STOPPING));
+    }
+
+    @ParameterizedTest
+    @MethodSource("notYetAvailableIndexStatuses")
+    void droppingNotAvailableIndexRemovesIt(CatalogIndexStatus status) {
+        createSomeTable(TABLE_NAME);
+        createSomeIndex(TABLE_NAME, INDEX_NAME);
+
+        rollIndexStatusTo(status, indexId(INDEX_NAME));
+
+        dropIndex(INDEX_NAME);
+
+        CatalogIndexDescriptor index = index(manager.latestCatalogVersion(), INDEX_NAME);
+
+        assertThat(index, is(nullValue()));
+    }
+
+    private void startBuildingIndex(int indexId) {
+        assertThat(manager.execute(StartBuildingIndexCommand.builder().indexId(indexId).build()), willCompleteSuccessfully());
+    }
+
+    private static Stream<Arguments> notYetAvailableIndexStatuses() {
+        return Stream.of(REGISTERED, BUILDING).map(Arguments::of);
+    }
+
+    @Test
+    void removingStoppedIndexRemovesItFromCatalog() {
+        createSomeTable(TABLE_NAME);
+        createSomeIndex(TABLE_NAME, INDEX_NAME);
+
+        rollIndexStatusTo(STOPPING, indexId(INDEX_NAME));
+
+        assertThat(index(manager.latestCatalogVersion(), INDEX_NAME).status(), is(STOPPING));
+
+        removeIndex(indexId(INDEX_NAME));
+
+        CatalogIndexDescriptor index = index(manager.latestCatalogVersion(), INDEX_NAME);
+
+        assertThat(index, is(nullValue()));
+    }
+
+    private void rollIndexStatusTo(CatalogIndexStatus status, int indexId) {
+        for (CatalogIndexStatus currentStatus : List.of(REGISTERED, BUILDING, AVAILABLE, STOPPING)) {
+            if (currentStatus == status) {
+                break;
+            }
+
+            switch (currentStatus) {
+                case REGISTERED:
+                    startBuildingIndex(indexId);
+                    break;
+                case BUILDING:
+                    makeIndexAvailable(indexId);
+                    break;
+                case AVAILABLE:
+                    dropIndex(indexId);
+                    break;
+                case STOPPING:
+                    removeIndex(indexId);
+                    break;
+                default:
+                    fail("Unsupported state: " + currentStatus);
+                    break;
+            }
+        }
+    }
+
+    private void removeIndex(int indexId) {
+        assertThat(
+                manager.execute(RemoveIndexCommand.builder().indexId(indexId).build()),
+                willCompleteSuccessfully()
+        );
+    }
+
+    private void dropIndex(String indexName) {
+        assertThat(
+                manager.execute(DropIndexCommand.builder().indexName(indexName).schemaName(DEFAULT_SCHEMA_NAME).build()),
+                willCompleteSuccessfully()
+        );
+    }
+
+    private void dropIndex(int indexId) {
+        CatalogIndexDescriptor index = manager.index(indexId, Long.MAX_VALUE);
+        assertThat(index, is(notNullValue()));
+
+        dropIndex(index.name());
+    }
+
     @Test
     void testDropNotExistingIndex() {
         assertThat(
@@ -1942,19 +2071,19 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
 
         int indexId = indexId(INDEX_NAME);
 
-        assertThat(
-                manager.execute(startBuildingIndexCommand(indexId)),
-                willBe(nullValue())
-        );
+        startBuildingIndex(indexId);
+        makeIndexAvailable(indexId);
+
+        CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) index(manager.latestCatalogVersion(), INDEX_NAME);
 
+        assertEquals(AVAILABLE, index.status());
+    }
+
+    private void makeIndexAvailable(int indexId) {
         assertThat(
                 manager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
                 willBe(nullValue())
         );
-
-        CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) index(manager.latestCatalogVersion(), INDEX_NAME);
-
-        assertEquals(AVAILABLE, index.status());
     }
 
     @Test
@@ -1973,10 +2102,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
                 willBe(nullValue())
         );
 
-        assertThat(
-                manager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
-                willBe(nullValue())
-        );
+        makeIndexAvailable(indexId);
 
         CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) index(manager.latestCatalogVersion(), INDEX_NAME);
 
@@ -2017,10 +2143,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
                 willBe(nullValue())
         );
 
-        assertThat(
-                manager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
-                willBe(nullValue())
-        );
+        makeIndexAvailable(indexId);
 
         assertThat(fireEventFuture, willCompleteSuccessfully());
     }
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java
index 193aa2f3c9..3ef5abcbdf 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractChangeIndexStatusCommandValidationTest.java
@@ -17,11 +17,7 @@
 
 package org.apache.ignite.internal.catalog.commands;
 
-import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
-import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
-import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 import java.util.List;
@@ -34,7 +30,6 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -58,17 +53,12 @@ public abstract class AbstractChangeIndexStatusCommandValidationTest extends Abs
      */
     abstract boolean isInvalidPreviousIndexStatus(CatalogIndexStatus indexStatus);
 
-    @Test
-    void exceptionIsThrownIfIndexWithGivenNameNotFound() {
-        Catalog catalog = emptyCatalog();
-
-        CatalogCommand command = createCommand(1);
+    Class<? extends Exception> expectedExceptionClassForWrongStatus() {
+        return ChangeIndexStatusValidationException.class;
+    }
 
-        assertThrowsWithCause(
-                () -> command.get(catalog),
-                IndexNotFoundValidationException.class,
-                "Index with ID '1' not found"
-        );
+    String expectedExceptionMessageSubstringForWrongStatus() {
+        return "It is impossible to change the index status:";
     }
 
     @ParameterizedTest
@@ -108,8 +98,8 @@ public abstract class AbstractChangeIndexStatusCommandValidationTest extends Abs
 
         assertThrowsWithCause(
                 () -> command.get(catalog),
-                ChangeIndexStatusValidationException.class,
-                "It is impossible to change the index status:"
+                expectedExceptionClassForWrongStatus(),
+                expectedExceptionMessageSubstringForWrongStatus()
         );
     }
 
@@ -117,20 +107,16 @@ public abstract class AbstractChangeIndexStatusCommandValidationTest extends Abs
         return Stream.of(CatalogIndexStatus.values()).map(Arguments::of);
     }
 
-    private static CatalogTableDescriptor table(int tableId, int schemaId, int zoneId, int pkIndexId, String columnName) {
-        return new CatalogTableDescriptor(
-                tableId,
-                schemaId,
-                pkIndexId,
-                "TEST_TABLE",
-                zoneId,
-                List.of(tableColumn(columnName)),
-                List.of(columnName),
-                null
-        );
-    }
+    @Test
+    void exceptionIsThrownIfIndexWithGivenIdNotFound() {
+        Catalog catalog = emptyCatalog();
+
+        CatalogCommand command = createCommand(1);
 
-    private static CatalogTableColumnDescriptor tableColumn(String columnName) {
-        return new CatalogTableColumnDescriptor(columnName, INT32, false, DEFAULT_PRECISION, DEFAULT_SCALE, DEFAULT_LENGTH, null);
+        assertThrowsWithCause(
+                () -> command.get(catalog),
+                IndexNotFoundValidationException.class,
+                "Index with ID '1' not found"
+        );
     }
 }
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
index 6282f8eb62..1b2dec3f3a 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.catalog.commands;
 
 import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.SYSTEM_SCHEMAS;
 import static org.apache.ignite.sql.ColumnType.INT32;
 
@@ -30,6 +33,7 @@ import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.storage.UpdateEntry;
@@ -172,4 +176,21 @@ abstract class AbstractCommandValidationTest extends BaseIgniteAbstractTest {
                 ))
         );
     }
+
+    static CatalogTableDescriptor table(int tableId, int schemaId, int zoneId, int pkIndexId, String columnName) {
+        return new CatalogTableDescriptor(
+                tableId,
+                schemaId,
+                pkIndexId,
+                "TEST_TABLE",
+                zoneId,
+                List.of(tableColumn(columnName)),
+                List.of(columnName),
+                null
+        );
+    }
+
+    static CatalogTableColumnDescriptor tableColumn(String columnName) {
+        return new CatalogTableColumnDescriptor(columnName, INT32, false, DEFAULT_PRECISION, DEFAULT_SCALE, DEFAULT_LENGTH, null);
+    }
 }
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
index 99fc939684..28232ec47e 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
@@ -30,6 +30,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -40,6 +41,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogCommand;
@@ -193,6 +195,10 @@ public class CatalogUtilsTest extends BaseIgniteAbstractTest {
 
         dropIndex(indexName1);
 
+        int catalogVersionBeforeRemoveIndex1 = catalogManager.latestCatalogVersion();
+
+        removeIndex(indexName1);
+
         int latestCatalogVersion = catalogManager.latestCatalogVersion();
         int earliestCatalogVersion = catalogManager.earliestCatalogVersion();
 
@@ -203,15 +209,16 @@ public class CatalogUtilsTest extends BaseIgniteAbstractTest {
                 hasItems(index(catalogManager, latestCatalogVersion, PK_INDEX_NAME))
         );
 
-        assertThat(
-                collectIndexes(catalogManager, tableId, earliestCatalogVersion, latestCatalogVersion),
-                hasItems(
-                        index(catalogManager, latestCatalogVersion, PK_INDEX_NAME),
-                        index(catalogManager, catalogVersionBeforeDropIndex0, indexName0),
-                        index(catalogManager, catalogVersionBeforeDropIndex1, indexName1),
-                        index(catalogManager, catalogVersionBeforeDropIndex1, indexName2)
-                )
+        Collection<CatalogIndexDescriptor> collectedIndexes = collectIndexes(
+                catalogManager,
+                tableId,
+                earliestCatalogVersion,
+                latestCatalogVersion
         );
+        assertThat(collectedIndexes, hasItem(index(catalogManager, latestCatalogVersion, PK_INDEX_NAME)));
+        assertThat(collectedIndexes, hasItem(index(catalogManager, catalogVersionBeforeDropIndex0, indexName0)));
+        assertThat(collectedIndexes, hasItem(index(catalogManager, catalogVersionBeforeRemoveIndex1, indexName1)));
+        assertThat(collectedIndexes, hasItem(index(catalogManager, catalogVersionBeforeDropIndex1, indexName2)));
     }
 
     /**
@@ -253,10 +260,12 @@ public class CatalogUtilsTest extends BaseIgniteAbstractTest {
         makeIndexAvailable(indexName0);
         makeIndexAvailable(indexName2);
 
-        int catalogVersionBeforeDropIndex0 = catalogManager.latestCatalogVersion();
-
         dropIndex(indexName0);
 
+        int catalogVersionBeforeRemoveIndex0 = catalogManager.latestCatalogVersion();
+
+        removeIndex(indexName0);
+
         int catalogVersionBeforeDropIndex3 = catalogManager.latestCatalogVersion();
 
         dropIndex(indexName3);
@@ -275,16 +284,17 @@ public class CatalogUtilsTest extends BaseIgniteAbstractTest {
                 )
         );
 
-        assertThat(
-                collectIndexes(catalogManager, tableId, earliestCatalogVersion, latestCatalogVersion),
-                hasItems(
-                        index(catalogManager, latestCatalogVersion, PK_INDEX_NAME),
-                        index(catalogManager, catalogVersionBeforeDropIndex0, indexName0),
-                        index(catalogManager, latestCatalogVersion, indexName1),
-                        index(catalogManager, latestCatalogVersion, indexName2),
-                        index(catalogManager, catalogVersionBeforeDropIndex3, indexName3)
-                )
+        Collection<CatalogIndexDescriptor> collectedIndexes = collectIndexes(
+                catalogManager,
+                tableId,
+                earliestCatalogVersion,
+                latestCatalogVersion
         );
+        assertThat(collectedIndexes, hasItems(index(catalogManager, latestCatalogVersion, PK_INDEX_NAME)));
+        assertThat(collectedIndexes, hasItems(index(catalogManager, catalogVersionBeforeRemoveIndex0, indexName0)));
+        assertThat(collectedIndexes, hasItems(index(catalogManager, latestCatalogVersion, indexName1)));
+        assertThat(collectedIndexes, hasItems(index(catalogManager, latestCatalogVersion, indexName2)));
+        assertThat(collectedIndexes, hasItems(index(catalogManager, catalogVersionBeforeDropIndex3, indexName3)));
     }
 
     @Test
@@ -394,6 +404,18 @@ public class CatalogUtilsTest extends BaseIgniteAbstractTest {
         assertThat(catalogManager.execute(catalogCommand), willCompleteSuccessfully());
     }
 
+    private void removeIndex(String indexName) {
+        CatalogIndexDescriptor indexDescriptor = catalogManager.index(indexName, HybridTimestamp.MAX_VALUE.longValue());
+
+        assertThat(indexDescriptor, is(notNullValue()));
+
+        CatalogCommand catalogCommand = RemoveIndexCommand.builder()
+                .indexId(indexDescriptor.id())
+                .build();
+
+        assertThat(catalogManager.execute(catalogCommand), willCompleteSuccessfully());
+    }
+
     private int tableId(int catalogVersion, String tableName) {
         CatalogTableDescriptor tableDescriptor = catalogManager.tables(catalogVersion).stream()
                 .filter(table -> tableName.equals(table.name()))
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandValidationTest.java
new file mode 100644
index 0000000000..2d37e6a21b
--- /dev/null
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/RemoveIndexCommandValidationTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
+
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+
+/** Tests to verify validation of {@link RemoveIndexCommand}. */
+public class RemoveIndexCommandValidationTest extends AbstractChangeIndexStatusCommandValidationTest {
+    @Override
+    CatalogCommand createCommand(int indexId) {
+        return RemoveIndexCommand.builder().indexId(indexId).build();
+    }
+
+    @Override
+    boolean isInvalidPreviousIndexStatus(CatalogIndexStatus indexStatus) {
+        return indexStatus != STOPPING;
+    }
+
+    @Override
+    Class<? extends Exception> expectedExceptionClassForWrongStatus() {
+        return CatalogValidationException.class;
+    }
+
+    @Override
+    String expectedExceptionMessageSubstringForWrongStatus() {
+        return "Cannot remove index";
+    }
+}
diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index 773fe79848..18146c6433 100644
--- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
 
 /** Utilities for working with the catalog in tests. */
 public class CatalogTestUtils {
@@ -363,20 +364,35 @@ public class CatalogTestUtils {
     }
 
     /**
-     * Searches for an index by name in the requested version of the catalog.
+     * Searches for an index by name in the requested version of the catalog. Throws if the index is not found.
      *
      * @param catalogService Catalog service.
      * @param catalogVersion Catalog version in which to find the index.
      * @param indexName Index name.
+     * @return Index (cannot be null).
      */
     public static CatalogIndexDescriptor index(CatalogService catalogService, int catalogVersion, String indexName) {
-        CatalogIndexDescriptor indexDescriptor = catalogService.indexes(catalogVersion).stream()
-                .filter(index -> indexName.equals(index.name()))
-                .findFirst()
-                .orElse(null);
+        CatalogIndexDescriptor indexDescriptor = indexOrNull(catalogService,
+                catalogVersion, indexName);
 
         assertNotNull(indexDescriptor, "catalogVersion=" + catalogVersion + ", indexName=" + indexName);
 
         return indexDescriptor;
     }
+
+    /**
+     * Searches for an index by name in the requested version of the catalog.
+     *
+     * @param catalogService Catalog service.
+     * @param catalogVersion Catalog version in which to find the index.
+     * @param indexName Index name.
+     * @return Index or {@code null} if not found.
+     */
+    @Nullable
+    public static CatalogIndexDescriptor indexOrNull(CatalogService catalogService, int catalogVersion, String indexName) {
+        return catalogService.indexes(catalogVersion).stream()
+                .filter(index -> indexName.equals(index.name()))
+                .findFirst()
+                .orElse(null);
+    }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
index 0c9eb7ad43..6cdcb89753 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -23,10 +23,12 @@ import static java.util.function.Predicate.not;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
 import static org.apache.ignite.internal.index.IndexManagementUtils.PARTITION_BUILD_INDEX_KEY_PREFIX;
 import static org.apache.ignite.internal.index.IndexManagementUtils.extractIndexIdFromPartitionBuildIndexKey;
 import static org.apache.ignite.internal.index.IndexManagementUtils.getPartitionCountFromCatalog;
 import static org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey;
+import static org.apache.ignite.internal.index.IndexManagementUtils.index;
 import static org.apache.ignite.internal.index.IndexManagementUtils.isAnyMetastoreKeyPresentLocally;
 import static org.apache.ignite.internal.index.IndexManagementUtils.isMetastoreKeyAbsentLocally;
 import static org.apache.ignite.internal.index.IndexManagementUtils.makeIndexAvailableInCatalogWithoutFuture;
@@ -54,8 +56,8 @@ import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -93,12 +95,12 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
  *
  * <p>Notes:</p>
  * <ul>
- *     <li>At {@link CatalogEvent#INDEX_DROP},
+ *     <li>At {@link CatalogEvent#INDEX_REMOVED},
  *     {@link IndexManagementUtils#putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager, int, int) index building keys} in the metastore
  *     are deleted.</li>
- *     <li>Handling of {@link CatalogEvent#INDEX_BUILDING}, {@link CatalogEvent#INDEX_DROP}, {@link CatalogEvent#INDEX_AVAILABLE} and watch
- *     prefix {@link IndexManagementUtils#PARTITION_BUILD_INDEX_KEY_PREFIX} is made by the whole cluster (and only one node makes a write to
- *     the metastore) as these events are global, but only one node (a primary replica owning a partition) handles
+ *     <li>Handling of {@link CatalogEvent#INDEX_BUILDING}, {@link CatalogEvent#INDEX_REMOVED}, {@link CatalogEvent#INDEX_AVAILABLE}
+ *     and watch prefix {@link IndexManagementUtils#PARTITION_BUILD_INDEX_KEY_PREFIX} is made by the whole cluster (and only one node makes
+ *     a write to the metastore) as these events are global, but only one node (a primary replica owning a partition) handles
  *     {@link IndexBuildCompletionListener#onBuildCompletion} (form {@link IndexBuilder#listen}) event.</li>
  *     <li>Restoring index availability occurs in {@link #recover(long)}.</li>
  * </ul>
@@ -195,12 +197,12 @@ class IndexAvailabilityController implements ManuallyCloseable {
             return onIndexBuilding((StartBuildingIndexEventParameters) parameters).thenApply(unused -> false);
         });
 
-        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) -> {
+        catalogService.listen(CatalogEvent.INDEX_REMOVED, (parameters, exception) -> {
             if (exception != null) {
                 return failedFuture(exception);
             }
 
-            return onIndexDrop((DropIndexEventParameters) parameters).thenApply(unused -> false);
+            return onIndexRemoved((RemoveIndexEventParameters) parameters).thenApply(unused -> false);
         });
 
         catalogService.listen(CatalogEvent.INDEX_AVAILABLE, (parameters, exception) -> {
@@ -236,10 +238,17 @@ class IndexAvailabilityController implements ManuallyCloseable {
         });
     }
 
-    private CompletableFuture<?> onIndexDrop(DropIndexEventParameters parameters) {
+    private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             int indexId = parameters.indexId();
 
+            CatalogIndexDescriptor indexBeforeRemoval = index(catalogManager, indexId, parameters.catalogVersion() - 1);
+
+            if (indexBeforeRemoval.status() == STOPPING) {
+                // It has already been built, nothing do to here.
+                return nullCompletedFuture();
+            }
+
             int partitions = getPartitionCountFromCatalog(catalogManager, indexId, parameters.catalogVersion() - 1);
 
             ByteArray inProgressBuildIndexKey = inProgressBuildIndexMetastoreKey(indexId);
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index 5d6b616371..0fb71c8e5e 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -38,7 +38,7 @@ import java.util.function.Function;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -64,7 +64,7 @@ import org.apache.ignite.network.ClusterService;
  * {@link TablePartitionId#tableId()}): </p>
  * <ul>
  *     <li>{@link CatalogEvent#INDEX_BUILDING} - starts building indexes for the corresponding local primary replicas.</li>
- *     <li>{@link CatalogEvent#INDEX_DROP} - stops building indexes for the corresponding local primary replicas.</li>
+ *     <li>{@link CatalogEvent#INDEX_REMOVED} - stops building indexes for the corresponding local primary replicas.</li>
  *     <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - for a new local primary replica, starts the building of all corresponding
  *     indexes, for an expired primary replica, stops the building of all corresponding indexes.</li>
  * </ul>
@@ -131,12 +131,12 @@ class IndexBuildController implements ManuallyCloseable {
             return onIndexBuilding((StartBuildingIndexEventParameters) parameters).thenApply(unused -> false);
         });
 
-        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) -> {
+        catalogService.listen(CatalogEvent.INDEX_REMOVED, (parameters, exception) -> {
             if (exception != null) {
                 return failedFuture(exception);
             }
 
-            return onIndexDrop((DropIndexEventParameters) parameters).thenApply(unused -> false);
+            return onIndexRemoved((RemoveIndexEventParameters) parameters).thenApply(unused -> false);
         });
 
         placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, (parameters, exception) -> {
@@ -174,7 +174,7 @@ class IndexBuildController implements ManuallyCloseable {
         });
     }
 
-    private CompletableFuture<?> onIndexDrop(DropIndexEventParameters parameters) {
+    private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             indexBuilder.stopBuildingIndexes(parameters.indexId());
 
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingStarterController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingStarterController.java
index 13c0148717..1b50ed8085 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingStarterController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingStarterController.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -50,7 +50,7 @@ import org.apache.ignite.network.ClusterService;
  * <br><p>Tasks are started and stopped based on events:</p>
  * <ul>
  *     <li>{@link CatalogEvent#INDEX_CREATE} - task starts for the new index.</li>
- *     <li>{@link CatalogEvent#INDEX_DROP} - stops task for the new index if it has been added.</li>
+ *     <li>{@link CatalogEvent#INDEX_REMOVED} - stops task for the new index if it has been added.</li>
  *     <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - when the local node becomes the primary replica for partition {@code 0} of
  *     the table, it starts tasks for its all new indexes, and when the primary replica expires, it stops all tasks for the table.</li>
  * </ul>
@@ -108,12 +108,12 @@ class IndexBuildingStarterController implements ManuallyCloseable {
             return onIndexCreate((CreateIndexEventParameters) parameters).thenApply(unused -> false);
         });
 
-        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) -> {
+        catalogService.listen(CatalogEvent.INDEX_REMOVED, (parameters, exception) -> {
             if (exception != null) {
                 return failedFuture(exception);
             }
 
-            return onIndexDrop((DropIndexEventParameters) parameters).thenApply(unused -> false);
+            return onIndexRemoved((RemoveIndexEventParameters) parameters).thenApply(unused -> false);
         });
 
         placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, (parameters, exception) -> {
@@ -139,10 +139,12 @@ class IndexBuildingStarterController implements ManuallyCloseable {
         });
     }
 
-    private CompletableFuture<?> onIndexDrop(DropIndexEventParameters parameters) {
+    private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             CatalogIndexDescriptor indexDescriptor = catalogService.index(parameters.indexId(), parameters.catalogVersion() - 1);
 
+            assert indexDescriptor != null : parameters.indexId();
+
             indexBuildingStarter.stopTask(indexDescriptor);
 
             return nullCompletedFuture();
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java
index 98517bbfa7..3247cf3dc6 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java
@@ -22,6 +22,7 @@ import static java.util.Collections.unmodifiableList;
 import static java.util.Comparator.comparingInt;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -39,8 +40,8 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 
@@ -51,8 +52,8 @@ class IndexChooser implements ManuallyCloseable {
     private final CatalogService catalogService;
 
     /**
-     * Map that, for each key, contains a list of all dropped available table indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
-     * catalog versions.
+     * Map that, for each key, contains a list of all indexes that were available, but now are removed (sorted by {@link #INDEX_COMPARATOR})
+     * for all known catalog versions.
      *
      * <p>Examples below will be within the same table ID.</p>
      *
@@ -71,7 +72,7 @@ class IndexChooser implements ManuallyCloseable {
      *     3 -> [I1(A), I3(A)]
      * </pre>
      *
-     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting dropped available indexes}, we will return the following:</p>
+     * <p>Then, when {@link #getRemovedAvailableIndexes(int, int) getting removed available indexes}, we will return the following:</p>
      * <pre>
      *     0 -> []
      *     1 -> [I1(A)]
@@ -83,8 +84,8 @@ class IndexChooser implements ManuallyCloseable {
      * <p>Updated on {@link #recover() node recovery} and a catalog events processing.</p>
      */
     // TODO: IGNITE-20121 We may need to worry about parallel map changes when deleting catalog version
-    // TODO: IGNITE-20934 Worry about cleaning up dropped indexes earlier
-    private final NavigableMap<TableIdCatalogVersion, List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+    // TODO: IGNITE-20934 Worry about cleaning up removed indexes earlier
+    private final NavigableMap<TableIdCatalogVersion, List<CatalogIndexDescriptor>> removedAvailableTableIndexes
             = new ConcurrentSkipListMap<>();
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -126,8 +127,8 @@ class IndexChooser implements ManuallyCloseable {
                             : String.format("Table should not be dropped: [catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
 
                     for (CatalogIndexDescriptor tableIndex : tableIndexes) {
-                        if (tableIndex.status() == AVAILABLE && !contains(nextCatalogVersionTableIndexes, tableIndex)) {
-                            addDroppedAvailableIndex(tableIndex, nextCatalogVersion);
+                        if (tableIndex.status() == STOPPING && !contains(nextCatalogVersionTableIndexes, tableIndex)) {
+                            addRemovedAvailableIndex(tableIndex, nextCatalogVersion);
                         }
                     }
                 }
@@ -143,13 +144,13 @@ class IndexChooser implements ManuallyCloseable {
 
         busyLock.block();
 
-        droppedAvailableTableIndexes.clear();
+        removedAvailableTableIndexes.clear();
     }
 
     /**
      * Collects a list of table indexes that will need to be used for an update operation in an RW transaction. The list consists of
-     * registered, building and available indexes on the requested catalog version, as well as all dropped available indexes from previous
-     * catalog versions.
+     * registered, building and available indexes on the requested catalog version, as well as all indexes that were available at
+     * some moment, but then were removed, from previous catalog versions.
      *
      * <p>Returned list is sorted by {@link CatalogObjectDescriptor#id()}. The table is expected to exist in the catalog at the requested
      * version.</p>
@@ -163,23 +164,23 @@ class IndexChooser implements ManuallyCloseable {
 
             assert !tableIndexes.isEmpty() : "catalogVersion=" + catalogVersion + ", tableId=" + tableId;
 
-            List<CatalogIndexDescriptor> droppedAvailableTableIndexes = getDroppedAvailableIndexes(catalogVersion, tableId);
+            List<CatalogIndexDescriptor> removedAvailableTableIndexes = getRemovedAvailableIndexes(catalogVersion, tableId);
 
-            if (droppedAvailableTableIndexes.isEmpty()) {
+            if (removedAvailableTableIndexes.isEmpty()) {
                 return tableIndexes;
             }
 
-            return unmodifiableList(merge(tableIndexes, droppedAvailableTableIndexes));
+            return unmodifiableList(merge(tableIndexes, removedAvailableTableIndexes));
         });
     }
 
     private void addListeners() {
-        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) -> {
+        catalogService.listen(CatalogEvent.INDEX_REMOVED, (parameters, exception) -> {
             if (exception != null) {
                 return failedFuture(exception);
             }
 
-            return onDropIndex((DropIndexEventParameters) parameters).thenApply(unused -> false);
+            return onIndexRemoved((RemoveIndexEventParameters) parameters).thenApply(unused -> false);
         });
 
         catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, exception) -> {
@@ -191,19 +192,19 @@ class IndexChooser implements ManuallyCloseable {
         });
     }
 
-    private CompletableFuture<?> onDropIndex(DropIndexEventParameters parameters) {
+    private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             int previousCatalogVersion = parameters.catalogVersion() - 1;
 
-            CatalogIndexDescriptor droppedIndexDescriptor = catalogService.index(parameters.indexId(), previousCatalogVersion);
+            CatalogIndexDescriptor removedIndexDescriptor = catalogService.index(parameters.indexId(), previousCatalogVersion);
 
-            assert droppedIndexDescriptor != null : "indexId=" + parameters.indexId() + ", catalogVersion=" + previousCatalogVersion;
+            assert removedIndexDescriptor != null : "indexId=" + parameters.indexId() + ", catalogVersion=" + previousCatalogVersion;
 
-            if (droppedIndexDescriptor.status() != AVAILABLE) {
+            if (removedIndexDescriptor.status() != AVAILABLE && removedIndexDescriptor.status() != STOPPING) {
                 return nullCompletedFuture();
             }
 
-            addDroppedAvailableIndex(droppedIndexDescriptor, parameters.catalogVersion());
+            addRemovedAvailableIndex(removedIndexDescriptor, parameters.catalogVersion());
 
             return nullCompletedFuture();
         });
@@ -211,62 +212,62 @@ class IndexChooser implements ManuallyCloseable {
 
     private CompletableFuture<?> onDropTable(DropTableEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            // We can remove dropped indexes on table drop as we need such indexes only for writing, and write operations will be denied
+            // We can remove removed indexes on table drop as we need such indexes only for writing, and write operations will be denied
             // right after a table drop has been activated.
-            droppedAvailableTableIndexes.entrySet().removeIf(entry -> parameters.tableId() == entry.getKey().tableId);
+            removedAvailableTableIndexes.entrySet().removeIf(entry -> parameters.tableId() == entry.getKey().tableId);
 
             return nullCompletedFuture();
         });
     }
 
     /**
-     * Returns a list of dropped available indexes (sorted by {@link #INDEX_COMPARATOR}) for the catalog version of interest from
-     * {@link #droppedAvailableTableIndexes}. If there is no list for the requested catalog version, the closest previous catalog version
-     * will be returned.
+     * Returns a list of indexes that are/were available, but then were removed (sorted by {@link #INDEX_COMPARATOR}) for the catalog
+     * version of interest from {@link #removedAvailableTableIndexes}. If there is no list for the requested catalog version, the closest
+     * previous catalog version will be returned.
      */
-    private List<CatalogIndexDescriptor> getDroppedAvailableIndexes(int catalogVersion, int tableId) {
+    private List<CatalogIndexDescriptor> getRemovedAvailableIndexes(int catalogVersion, int tableId) {
         var key = new TableIdCatalogVersion(tableId, catalogVersion);
 
-        Entry<TableIdCatalogVersion, List<CatalogIndexDescriptor>> entry = droppedAvailableTableIndexes.floorEntry(key);
+        Entry<TableIdCatalogVersion, List<CatalogIndexDescriptor>> entry = removedAvailableTableIndexes.floorEntry(key);
 
         return entry != null && tableId == entry.getKey().tableId ? entry.getValue() : List.of();
     }
 
     /**
-     * Adds the dropped available index to {@link #droppedAvailableTableIndexes}.
+     * Adds a removed index that is (or was) available to {@link #removedAvailableTableIndexes}.
      *
      * <p>If the list is missing for the catalog version from the arguments, then we create it by merging the indexes from the previous
      * catalog version and the new index. Otherwise, we simply add to the existing list. Lists are sorted by {@link #INDEX_COMPARATOR}.</p>
      *
-     * @param droppedIndex Drooped index.
-     * @param catalogVersion Catalog version on which the index was dropped.
+     * @param removedIndex Removed index.
+     * @param catalogVersion Catalog version on which the index was removed.
      */
-    private void addDroppedAvailableIndex(CatalogIndexDescriptor droppedIndex, int catalogVersion) {
-        assert droppedIndex.status() == AVAILABLE : droppedIndex.id();
+    private void addRemovedAvailableIndex(CatalogIndexDescriptor removedIndex, int catalogVersion) {
+        assert removedIndex.status() == AVAILABLE || removedIndex.status() == STOPPING : removedIndex.id();
 
-        int tableId = droppedIndex.tableId();
+        int tableId = removedIndex.tableId();
 
         // For now, there is no need to worry about parallel changes to the map, it will change on recovery and in catalog event listeners
         // and won't interfere with each other.
         // TODO: IGNITE-20121 We may need to worry about parallel map changes when deleting catalog version
-        List<CatalogIndexDescriptor> previousCatalogVersionDroppedIndexes = getDroppedAvailableIndexes(catalogVersion - 1, tableId);
+        List<CatalogIndexDescriptor> previousCatalogVersionRemovedIndexes = getRemovedAvailableIndexes(catalogVersion - 1, tableId);
 
-        droppedAvailableTableIndexes.compute(
+        removedAvailableTableIndexes.compute(
                 new TableIdCatalogVersion(tableId, catalogVersion),
-                (tableIdCatalogVersion, droppedAvailableIndexes) -> {
+                (tableIdCatalogVersion, removedAvailableIndexes) -> {
                     List<CatalogIndexDescriptor> res;
 
-                    if (droppedAvailableIndexes == null) {
-                        res = new ArrayList<>(1 + previousCatalogVersionDroppedIndexes.size());
+                    if (removedAvailableIndexes == null) {
+                        res = new ArrayList<>(1 + previousCatalogVersionRemovedIndexes.size());
 
-                        res.addAll(previousCatalogVersionDroppedIndexes);
+                        res.addAll(previousCatalogVersionRemovedIndexes);
                     } else {
-                        res = new ArrayList<>(1 + droppedAvailableIndexes.size());
+                        res = new ArrayList<>(1 + removedAvailableIndexes.size());
 
-                        res.addAll(droppedAvailableIndexes);
+                        res.addAll(removedAvailableIndexes);
                     }
 
-                    res.add(droppedIndex);
+                    res.add(removedIndex);
 
                     res.sort(INDEX_COMPARATOR);
 
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java
index ad5a8e590c..2e51d0e220 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java
@@ -188,9 +188,7 @@ class IndexManagementUtils {
      * @param catalogVersion Catalog version.
      */
     static int getPartitionCountFromCatalog(CatalogService catalogService, int indexId, int catalogVersion) {
-        CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, catalogVersion);
-
-        assert indexDescriptor != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion;
+        CatalogIndexDescriptor indexDescriptor = index(catalogService, indexId, catalogVersion);
 
         CatalogTableDescriptor tableDescriptor = catalogService.table(indexDescriptor.tableId(), catalogVersion);
 
@@ -203,6 +201,20 @@ class IndexManagementUtils {
         return zoneDescriptor.partitions();
     }
 
+    /**
+     * Finds an index by ID in the requested catalog version. Throws if it does not exist.
+     *
+     * @param catalogService Catalog service to be used to find the index.
+     * @param indexId ID of the index to find.
+     * @param catalogVersion Version of the catalog in which to look for the index.
+     */
+    static CatalogIndexDescriptor index(CatalogService catalogService, int indexId, int catalogVersion) {
+        CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, catalogVersion);
+
+        assert indexDescriptor != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion;
+        return indexDescriptor;
+    }
+
     /**
      * Makes the index available in the catalog, does not return the future execution of the operation, so as not to create dead locks when
      * performing the operation and the inability to complete it due to execution in the metastore thread or on recovery (the metastore
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index fd2c1082c9..4320204b76 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -21,6 +21,8 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
+import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_STOPPING;
+import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -39,12 +41,15 @@ import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongFunction;
+import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -84,8 +89,9 @@ public class IndexManager implements IgniteComponent {
     /** Table manager. */
     private final TableManager tableManager;
 
+    // TODO: IGNITE-21117 -change the field type to CatalogService again.
     /** Catalog service. */
-    private final CatalogService catalogService;
+    private final CatalogManager catalogManager;
 
     /** Meta storage manager. */
     private final MetaStorageManager metaStorageManager;
@@ -107,18 +113,18 @@ public class IndexManager implements IgniteComponent {
      *
      * @param schemaManager Schema manager.
      * @param tableManager Table manager.
-     * @param catalogService Catalog manager.
+     * @param catalogManager Catalog manager.
      */
     public IndexManager(
             SchemaManager schemaManager,
             TableManager tableManager,
-            CatalogService catalogService,
+            CatalogManager catalogManager,
             MetaStorageManager metaStorageManager,
             Consumer<LongFunction<CompletableFuture<?>>> registry
     ) {
         this.schemaManager = schemaManager;
         this.tableManager = tableManager;
-        this.catalogService = catalogService;
+        this.catalogManager = catalogManager;
         this.metaStorageManager = metaStorageManager;
 
         startVv = new IncrementalVersionedValue<>(registry);
@@ -131,7 +137,7 @@ public class IndexManager implements IgniteComponent {
 
         startIndexes();
 
-        catalogService.listen(INDEX_CREATE, (parameters, exception) -> {
+        catalogManager.listen(INDEX_CREATE, (parameters, exception) -> {
             if (exception != null) {
                 return failedFuture(exception);
             }
@@ -139,11 +145,31 @@ public class IndexManager implements IgniteComponent {
             return onIndexCreate((CreateIndexEventParameters) parameters);
         });
 
+        // TODO: IGNITE-21117 - remove this.
+        catalogManager.listen(INDEX_STOPPING, (parameters, exception) -> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            removeIndex(((StoppingIndexEventParameters) parameters).indexId());
+
+            return falseCompletedFuture();
+        });
+
         LOG.info("Index manager started");
 
         return nullCompletedFuture();
     }
 
+    private CompletableFuture<Void> removeIndex(int indexId) {
+        return catalogManager.execute(RemoveIndexCommand.builder().indexId(indexId).build())
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        LOG.error("Cannot remove a dropped index [indexId={}]", ex, indexId);
+                    }
+                });
+    }
+
     @Override
     public void stop() throws Exception {
         LOG.debug("Index manager is about to stop");
@@ -178,7 +204,7 @@ public class IndexManager implements IgniteComponent {
     }
 
     // TODO: IGNITE-20121 Unregister index only before we physically start deleting the index before truncate catalog
-    private CompletableFuture<Boolean> onIndexDrop(DropIndexEventParameters parameters) {
+    private CompletableFuture<Boolean> onIndexDrop(StoppingIndexEventParameters parameters) {
         int indexId = parameters.indexId();
         int tableId = parameters.tableId();
 
@@ -211,7 +237,7 @@ public class IndexManager implements IgniteComponent {
             long causalityToken = parameters.causalityToken();
             int catalogVersion = parameters.catalogVersion();
 
-            CatalogTableDescriptor table = catalogService.table(tableId, catalogVersion);
+            CatalogTableDescriptor table = catalogManager.table(tableId, catalogVersion);
 
             assert table != null : "tableId=" + tableId + ", indexId=" + indexId;
 
@@ -320,10 +346,17 @@ public class IndexManager implements IgniteComponent {
 
         List<CompletableFuture<?>> startIndexFutures = new ArrayList<>();
 
-        for (Entry<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> e : collectIndexesForRecovery(catalogService).entrySet()) {
+        Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> indexesForRecovery = collectIndexesForRecovery(catalogManager);
+        for (Entry<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> e : indexesForRecovery.entrySet()) {
             CatalogTableDescriptor table = e.getKey();
 
             for (CatalogIndexDescriptor index : e.getValue()) {
+                // TODO: IGNITE-21117 - remove this.
+                if (index.status() == CatalogIndexStatus.STOPPING
+                        && catalogManager.index(index.id(), catalogManager.latestCatalogVersion()) != null) {
+                    startIndexFutures.add(removeIndex(index.id()));
+                }
+
                 startIndexFutures.add(startIndexAsync(table, index, causalityToken));
             }
         }
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexChooserTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexChooserTest.java
index 45db3e1efc..c1b47ef863 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexChooserTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexChooserTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
 import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
 import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
 import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -189,11 +190,32 @@ public class IndexChooserTest extends BaseIgniteAbstractTest {
         createIndex(INDEX_NAME);
         startBuildingIndex(INDEX_NAME);
         makeIndexAvailable(INDEX_NAME);
+        dropIndex(INDEX_NAME);
+
+        int catalogVersion = catalogManager.latestCatalogVersion();
 
-        int catalogVersionAfterMakeIndexAvailable = catalogManager.latestCatalogVersion();
+        if (withRecovery) {
+            recoverIndexCollector();
+        }
 
+        assertThat(
+                chooseForRwTxOperation(catalogVersion),
+                contains(index(catalogVersion, PK_INDEX_NAME), index(catalogVersion, INDEX_NAME))
+        );
+    }
+
+    @ParameterizedTest(name = "withRecovery = {0}")
+    @ValueSource(booleans = {false, true})
+    void testChooseForRwTxOperationAfterRemoveStoppedIndex(boolean withRecovery) {
+        createIndex(INDEX_NAME);
+        startBuildingIndex(INDEX_NAME);
+        makeIndexAvailable(INDEX_NAME);
         dropIndex(INDEX_NAME);
 
+        int catalogVersionAfterDropIndex = catalogManager.latestCatalogVersion();
+
+        removeIndex(INDEX_NAME);
+
         int catalogVersion = catalogManager.latestCatalogVersion();
 
         if (withRecovery) {
@@ -202,7 +224,7 @@ public class IndexChooserTest extends BaseIgniteAbstractTest {
 
         assertThat(
                 chooseForRwTxOperation(catalogVersion),
-                contains(index(catalogVersion, PK_INDEX_NAME), index(catalogVersionAfterMakeIndexAvailable, INDEX_NAME))
+                contains(index(catalogVersion, PK_INDEX_NAME), index(catalogVersionAfterDropIndex, INDEX_NAME))
         );
     }
 
@@ -241,15 +263,21 @@ public class IndexChooserTest extends BaseIgniteAbstractTest {
                 toStartBuildingIndexCommand(indexName5)
         );
 
-        int catalogVersionBeforeDropIndex4And5 = catalogManager.latestCatalogVersion();
+        // after execute: I0(A) I1(A) I3(B) I4(S)
+        executeCatalogCommands(toDropIndexCommand(indexName4), toDropIndexCommand(indexName5));
+
+        int catalogVersionBeforeRemoveIndex4 = catalogManager.latestCatalogVersion();
 
         // after execute: I0(A) I1(A) I3(B)
-        executeCatalogCommands(toDropIndexCommand(indexName4), toDropIndexCommand(indexName5));
+        executeCatalogCommands(toRemoveIndexCommand(indexName4));
 
-        int catalogVersionBeforeDropIndex1 = catalogManager.latestCatalogVersion();
+        // after execute: I0(A) I1(S) I3(B)
+        executeCatalogCommands(toDropIndexCommand(indexName1));
+
+        int catalogVersionBeforeRemoveIndex1 = catalogManager.latestCatalogVersion();
 
         // after execute: I0(A) I3(B)
-        executeCatalogCommands(toDropIndexCommand(indexName1));
+        executeCatalogCommands(toRemoveIndexCommand(indexName1));
 
         // after execute: I0(A) I3(B) I6(R)
         executeCatalogCommands(toCreateHashIndexCommand(indexName6));
@@ -265,9 +293,9 @@ public class IndexChooserTest extends BaseIgniteAbstractTest {
                 chooseForRwTxOperation(catalogVersion),
                 contains(
                         index(catalogVersion, PK_INDEX_NAME),                   // Alive available index0 (pk)
-                        index(catalogVersionBeforeDropIndex1, indexName1),      // Dropped available index1
+                        index(catalogVersionBeforeRemoveIndex1, indexName1),    // Removed available index1
                         index(catalogVersion, indexName3),                      // Building index3
-                        index(catalogVersionBeforeDropIndex4And5, indexName4),  // Dropped available index4
+                        index(catalogVersionBeforeRemoveIndex4, indexName4),    // Removed available index4
                         index(catalogVersion, indexName6)                       // Registered index6
                 )
         );
@@ -293,6 +321,10 @@ public class IndexChooserTest extends BaseIgniteAbstractTest {
         TestIndexManagementUtils.dropIndex(catalogManager, indexName);
     }
 
+    private void removeIndex(String indexName) {
+        TestIndexManagementUtils.removeIndex(catalogManager, indexName);
+    }
+
     private List<CatalogIndexDescriptor> chooseForRwTxOperation(int catalogVersion) {
         return indexChooser.chooseForRwTxUpdateOperation(catalogVersion, tableId);
     }
@@ -348,4 +380,10 @@ public class IndexChooserTest extends BaseIgniteAbstractTest {
                 .indexName(indexName)
                 .build();
     }
+
+    private CatalogCommand toRemoveIndexCommand(String indexName) {
+        return RemoveIndexCommand.builder()
+                .indexId(indexId(catalogManager, indexName, clock))
+                .build();
+    }
 }
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 9b49cdff0b..53f1d23f70 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.IgniteSql;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -415,6 +416,10 @@ public class IndexManagerTest extends BaseIgniteAbstractTest {
         return CatalogTestUtils.index(catalogManager, catalogVersion, indexName);
     }
 
+    private @Nullable CatalogIndexDescriptor indexOrNull(int catalogVersion, String indexName) {
+        return CatalogTestUtils.indexOrNull(catalogManager, catalogVersion, indexName);
+    }
+
     private void createTable(String tableName) {
         TestIndexManagementUtils.createTable(catalogManager, tableName, COLUMN_NAME);
     }
@@ -465,9 +470,14 @@ public class IndexManagerTest extends BaseIgniteAbstractTest {
         var res = new ArrayList<CatalogIndexDescriptor>(indexNames.length);
 
         for (String indexName : indexNames) {
-            res.add(index(catalogManager.latestCatalogVersion(), indexName));
+            int versionBeforeDrop = catalogManager.latestCatalogVersion();
+            CatalogIndexDescriptor indexBeforeDropping = index(versionBeforeDrop, indexName);
 
             dropIndex(indexName);
+
+            CatalogIndexDescriptor indexAfterDropping = indexOrNull(versionBeforeDrop + 1, indexName);
+
+            res.add(indexAfterDropping != null ? indexAfterDropping : indexBeforeDropping);
         }
 
         return res;
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
index 87486c399d..7b64d870b3 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
@@ -91,6 +91,10 @@ class TestIndexManagementUtils {
         TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
     }
 
+    static void removeIndex(CatalogManager catalogManager, String indexName) {
+        TableTestUtils.removeIndex(catalogManager, indexName);
+    }
+
     static int indexId(CatalogService catalogService, String indexName, HybridClock clock) {
         return TableTestUtils.getIndexIdStrict(catalogService, indexName, clock.nowLong());
     }
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index 72a46ee042..3f94c30f58 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
 import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
 import org.apache.ignite.internal.catalog.commands.DropTableCommand;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.jetbrains.annotations.Nullable;
@@ -95,6 +96,21 @@ public class TableTestUtils {
         );
     }
 
+    /**
+     * Removes index from the catalog.
+     *
+     * @param catalogManager Catalog manager.
+     * @param indexName Index name.
+     */
+    public static void removeIndex(CatalogManager catalogManager, String indexName) {
+        int indexId = getIndexIdStrict(catalogManager, indexName, Long.MAX_VALUE);
+
+        assertThat(
+                catalogManager.execute(RemoveIndexCommand.builder().indexId(indexId).build()),
+                willCompleteSuccessfully()
+        );
+    }
+
     /**
      * Creates hash index in the catalog.
      *