You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/06/09 14:27:57 UTC

[ignite-3] branch main updated: IGNITE-19642 CatalogService should use HybridClock to register new Catalog version (#2154)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 85b18910ee IGNITE-19642 CatalogService should use HybridClock to register new Catalog version (#2154)
85b18910ee is described below

commit 85b18910ee83dc52c05277997d24408741a15e78
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Fri Jun 9 17:27:51 2023 +0300

    IGNITE-19642 CatalogService should use HybridClock to register new Catalog version (#2154)
---
 .../internal/catalog/CatalogServiceImpl.java       |  66 ++++--
 .../internal/catalog/storage/VersionedUpdate.java  |  11 +-
 .../internal/catalog/CatalogServiceSelfTest.java   | 236 +++++++++++----------
 .../catalog/storage/UpdateLogImplTest.java         |   6 +-
 .../storage/ItRebalanceDistributedTest.java        |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 7 files changed, 186 insertions(+), 139 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 8b3acbcd25..133904021e 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.catalog.storage.UpdateEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateLog;
 import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
 import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.Producer;
@@ -112,6 +113,9 @@ import org.jetbrains.annotations.Nullable;
 public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParameters> implements CatalogManager {
     private static final int MAX_RETRY_COUNT = 10;
 
+    /** Safe time to wait before new Catalog version activation. */
+    private static final int DEFAULT_DELAY_DURATION = 0;
+
     /** The logger. */
     private static final IgniteLogger LOG = Loggers.forClass(CatalogServiceImpl.class);
 
@@ -125,11 +129,24 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
 
+    private final HybridClock clock;
+
+    private final long delayDurationMs;
+
+    /**
+     * Constructor.
+     */
+    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) {
+        this(updateLog, clock, DEFAULT_DELAY_DURATION);
+    }
+
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog) {
+    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock, long delayDurationMs) {
         this.updateLog = updateLog;
+        this.clock = clock;
+        this.delayDurationMs = delayDurationMs;
     }
 
     /** {@inheritDoc} */
@@ -767,8 +784,10 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
         }
 
         int newVersion = catalog.version() + 1;
+        //TODO https://issues.apache.org/jira/browse/IGNITE-19209 Make activation time in the MS entry strictly equal to MS entry ts+DD
+        long activationTimestamp = activationTimestamp();
 
-        return updateLog.append(new VersionedUpdate(newVersion, updates))
+        return updateLog.append(new VersionedUpdate(newVersion, activationTimestamp, updates))
                 .thenCompose(result -> versionTracker.waitFor(newVersion).thenApply(none -> result))
                 .thenCompose(result -> {
                     if (result) {
@@ -783,6 +802,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
         @Override
         public void handle(VersionedUpdate update) {
             int version = update.version();
+            long activationTimestamp = update.activationTimestamp();
             Catalog catalog = catalogByVer.get(version - 1);
 
             assert catalog != null;
@@ -796,7 +816,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
                 if (entry instanceof NewTableEntry) {
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones(),
                             List.of(new CatalogSchemaDescriptor(
@@ -817,7 +837,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones(),
                             List.of(new CatalogSchemaDescriptor(
@@ -838,7 +858,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones(),
                             List.of(new CatalogSchemaDescriptor(
@@ -869,7 +889,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones(),
                             List.of(new CatalogSchemaDescriptor(
@@ -898,7 +918,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
                 } else if (entry instanceof NewIndexEntry) {
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones(),
                             List.of(new CatalogSchemaDescriptor(
@@ -918,7 +938,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones(),
                             List.of(new CatalogSchemaDescriptor(
@@ -936,7 +956,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
                 } else if (entry instanceof NewZoneEntry) {
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             CollectionUtils.concat(catalog.zones(), List.of(((NewZoneEntry) entry).descriptor())),
                             catalog.schemas()
@@ -951,7 +971,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones().stream().filter(z -> z.id() != zoneId).collect(Collectors.toList()),
                             catalog.schemas()
@@ -966,7 +986,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones().stream()
                                     .map(z -> z.id() == descriptor.id() ? descriptor : z)
@@ -981,15 +1001,10 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
                 } else if (entry instanceof ObjectIdGenUpdateEntry) {
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState() + ((ObjectIdGenUpdateEntry) entry).delta(),
                             catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    schema.tables(),
-                                    schema.indexes()
-                            ))
+                            catalog.schemas()
                     );
                 } else if (entry instanceof AlterColumnEntry) {
                     int tableId = ((AlterColumnEntry) entry).tableId();
@@ -997,7 +1012,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
                     catalog = new Catalog(
                             version,
-                            System.currentTimeMillis(),
+                            activationTimestamp,
                             catalog.objectIdGenState(),
                             catalog.zones(),
                             List.of(new CatalogSchemaDescriptor(
@@ -1033,17 +1048,24 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
             registerCatalog(catalog);
 
             CompletableFuture.allOf(eventFutures.toArray(CompletableFuture[]::new))
-                    .thenRun(() -> versionTracker.update(version, null))
                     .whenComplete((ignore, err) -> {
                         if (err != null) {
                             LOG.warn("Failed to apply catalog update.", err);
-                        } else {
-                            versionTracker.update(version, null);
+                            //TODO: IGNITE-14611 Pass exception to an error handler because catalog got into inconsistent state.
                         }
+
+                        versionTracker.update(version, null);
                     });
         }
     }
 
+    /**
+     * Calculate catalog activation timestamp.
+     */
+    private long activationTimestamp() {
+        return clock.now().addPhysicalTime(delayDurationMs).longValue();
+    }
+
     private static void throwUnsupportedDdl(String msg, Object... params) {
         throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, IgniteStringFormatter.format(msg, params));
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
index 195c19c760..437534ccf9 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
@@ -31,6 +31,8 @@ public class VersionedUpdate implements Serializable {
 
     private final int version;
 
+    private final long activationTimestamp;
+
     @IgniteToStringInclude
     private final List<UpdateEntry> entries;
 
@@ -38,10 +40,12 @@ public class VersionedUpdate implements Serializable {
      * Constructs the object.
      *
      * @param version A version the changes relate to.
+     * @param activationTimestamp Timestamp given changes become active at.
      * @param entries A list of changes.
      */
-    public VersionedUpdate(int version, List<UpdateEntry> entries) {
+    public VersionedUpdate(int version, long activationTimestamp, List<UpdateEntry> entries) {
         this.version = version;
+        this.activationTimestamp = activationTimestamp;
         this.entries = List.copyOf(
                 Objects.requireNonNull(entries, "entries")
         );
@@ -52,6 +56,11 @@ public class VersionedUpdate implements Serializable {
         return version;
     }
 
+    /** Returns activation timestamp. */
+    public long activationTimestamp() {
+        return activationTimestamp;
+    }
+
     /** Returns list of changes. */
     public List<UpdateEntry> entries() {
         return entries;
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 91ffa02264..2d09e3f26c 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -83,6 +83,8 @@ import org.apache.ignite.internal.catalog.storage.UpdateLog;
 import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
@@ -131,6 +133,8 @@ public class CatalogServiceSelfTest {
 
     private CatalogServiceImpl service;
 
+    private HybridClock clock;
+
     @BeforeEach
     void setUp() throws NodeStoppingException {
         vault = new VaultManager(new InMemoryVaultService());
@@ -139,7 +143,8 @@ public class CatalogServiceSelfTest {
                 vault, new SimpleInMemoryKeyValueStorage("test")
         );
 
-        service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault));
+        clock = new HybridClockImpl();
+        service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault), clock, 0L);
 
         vault.start();
         metastore.start();
@@ -157,14 +162,14 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testEmptyCatalog() {
-        assertNotNull(service.activeSchema(System.currentTimeMillis()));
+        assertNotNull(service.activeSchema(clock.nowLong()));
         assertNotNull(service.schema(0));
 
         assertNull(service.schema(1));
         assertThrows(IllegalStateException.class, () -> service.activeSchema(-1L));
 
-        assertNull(service.table(0, System.currentTimeMillis()));
-        assertNull(service.index(0, System.currentTimeMillis()));
+        assertNull(service.table(0, clock.nowLong()));
+        assertNull(service.index(0, clock.nowLong()));
 
         CatalogSchemaDescriptor schema = service.schema(0);
         assertEquals(SCHEMA_NAME, schema.name());
@@ -173,7 +178,7 @@ public class CatalogServiceSelfTest {
         assertEquals(0, schema.tables().length);
         assertEquals(0, schema.indexes().length);
 
-        CatalogZoneDescriptor zone = service.zone(1, System.currentTimeMillis());
+        CatalogZoneDescriptor zone = service.zone(1, clock.nowLong());
         assertEquals(CatalogService.DEFAULT_ZONE_NAME, zone.name());
 
         assertEquals(1, zone.id());
@@ -223,10 +228,10 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertEquals(0, schema.id());
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+        assertSame(schema, service.activeSchema(clock.nowLong()));
 
-        assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis()));
-        assertSame(schema.table(TABLE_NAME), service.table(2, System.currentTimeMillis()));
+        assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, clock.nowLong()));
+        assertSame(schema.table(TABLE_NAME), service.table(2, clock.nowLong()));
 
         // Validate newly created table
         CatalogTableDescriptor table = schema.table(TABLE_NAME);
@@ -245,13 +250,13 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertEquals(0, schema.id());
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+        assertSame(schema, service.activeSchema(clock.nowLong()));
 
-        assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis()));
-        assertSame(schema.table(TABLE_NAME), service.table(2, System.currentTimeMillis()));
+        assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, clock.nowLong()));
+        assertSame(schema.table(TABLE_NAME), service.table(2, clock.nowLong()));
 
-        assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, System.currentTimeMillis()));
-        assertSame(schema.table(TABLE_NAME_2), service.table(3, System.currentTimeMillis()));
+        assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, clock.nowLong()));
+        assertSame(schema.table(TABLE_NAME_2), service.table(3, clock.nowLong()));
 
         assertNotSame(schema.table(TABLE_NAME), schema.table(TABLE_NAME_2));
 
@@ -259,17 +264,15 @@ public class CatalogServiceSelfTest {
         assertThat(service.createTable(simpleTable(TABLE_NAME_2)), willThrowFast(TableAlreadyExistsException.class));
 
         // Validate schema wasn't changed.
-        assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+        assertSame(schema, service.activeSchema(clock.nowLong()));
     }
 
     @Test
-    public void testDropTable() throws InterruptedException {
+    public void testDropTable() {
         assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
         assertThat(service.createTable(simpleTable(TABLE_NAME_2)), willBe((Object) null));
 
-        long beforeDropTimestamp = System.currentTimeMillis();
-
-        Thread.sleep(5);
+        long beforeDropTimestamp = clock.nowLong();
 
         DropTableParams dropTableParams = DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
 
@@ -295,24 +298,24 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertEquals(0, schema.id());
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+        assertSame(schema, service.activeSchema(clock.nowLong()));
 
         assertNull(schema.table(TABLE_NAME));
-        assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
-        assertNull(service.table(2, System.currentTimeMillis()));
+        assertNull(service.table(TABLE_NAME, clock.nowLong()));
+        assertNull(service.table(2, clock.nowLong()));
 
-        assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, System.currentTimeMillis()));
-        assertSame(schema.table(TABLE_NAME_2), service.table(3, System.currentTimeMillis()));
+        assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, clock.nowLong()));
+        assertSame(schema.table(TABLE_NAME_2), service.table(3, clock.nowLong()));
 
         // Try to drop table once again.
         assertThat(service.dropTable(dropTableParams), willThrowFast(TableNotFoundException.class));
 
         // Validate schema wasn't changed.
-        assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+        assertSame(schema, service.activeSchema(clock.nowLong()));
     }
 
     @Test
-    public void testAddColumn() throws InterruptedException {
+    public void testAddColumn() {
         assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
 
         AlterTableAddColumnParams params = AlterTableAddColumnParams.builder()
@@ -327,9 +330,7 @@ public class CatalogServiceSelfTest {
                 ))
                 .build();
 
-        long beforeAddedTimestamp = System.currentTimeMillis();
-
-        Thread.sleep(5);
+        long beforeAddedTimestamp = clock.nowLong();
 
         assertThat(service.addColumn(params), willBe((Object) null));
 
@@ -341,7 +342,7 @@ public class CatalogServiceSelfTest {
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
 
         // Validate actual catalog
-        schema = service.activeSchema(System.currentTimeMillis());
+        schema = service.activeSchema(clock.nowLong());
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
 
@@ -361,7 +362,7 @@ public class CatalogServiceSelfTest {
     }
 
     @Test
-    public void testDropColumn() throws InterruptedException {
+    public void testDropColumn() {
         assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
 
         // Validate dropping column
@@ -371,9 +372,7 @@ public class CatalogServiceSelfTest {
                 .columns(Set.of("VAL"))
                 .build();
 
-        long beforeAddedTimestamp = System.currentTimeMillis();
-
-        Thread.sleep(5);
+        long beforeAddedTimestamp = clock.nowLong();
 
         assertThat(service.dropColumn(params), willBe((Object) null));
 
@@ -385,7 +384,7 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema.table(TABLE_NAME).column("VAL"));
 
         // Validate actual catalog
-        schema = service.activeSchema(System.currentTimeMillis());
+        schema = service.activeSchema(clock.nowLong());
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
 
@@ -394,7 +393,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateDropColumnIfTableNotExists() {
-        assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
+        assertNull(service.table(TABLE_NAME, clock.nowLong()));
 
         // Try to add a new column.
         AlterTableAddColumnParams addColumnParams = AlterTableAddColumnParams.builder()
@@ -439,7 +438,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.dropColumn(params), willThrow(SqlException.class));
 
         // Validate actual catalog
-        CatalogSchemaDescriptor schema = service.activeSchema(System.currentTimeMillis());
+        CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
         assertSame(service.schema(2), schema);
@@ -465,7 +464,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.addColumn(addColumnParams), willThrow(ColumnAlreadyExistsException.class));
 
         // Validate no column added.
-        CatalogSchemaDescriptor schema = service.activeSchema(System.currentTimeMillis());
+        CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
 
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
 
@@ -482,7 +481,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.addColumn(addColumnParams), willBe((Object) null));
 
         // Validate both columns added.
-        schema = service.activeSchema(System.currentTimeMillis());
+        schema = service.activeSchema(clock.nowLong());
 
         assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
         assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -497,7 +496,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.dropColumn(dropColumnParams), willBe((Object) null));
 
         // Validate both columns dropped.
-        schema = service.activeSchema(System.currentTimeMillis());
+        schema = service.activeSchema(clock.nowLong());
 
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -512,7 +511,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.dropColumn(dropColumnParams), willThrow(ColumnNotFoundException.class));
 
         // Validate no column dropped.
-        schema = service.activeSchema(System.currentTimeMillis());
+        schema = service.activeSchema(clock.nowLong());
 
         assertNotNull(schema.table(TABLE_NAME).column("VAL"));
     }
@@ -902,7 +901,7 @@ public class CatalogServiceSelfTest {
     }
 
     @Test
-    public void testDropTableWithIndex() throws InterruptedException {
+    public void testDropTableWithIndex() {
         CreateHashIndexParams params = CreateHashIndexParams.builder()
                 .indexName(INDEX_NAME)
                 .tableName(TABLE_NAME)
@@ -912,9 +911,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
         assertThat(service.createIndex(params), willBe((Object) null));
 
-        long beforeDropTimestamp = System.currentTimeMillis();
-
-        Thread.sleep(5);
+        long beforeDropTimestamp = clock.nowLong();
 
         DropTableParams dropTableParams = DropTableParams.builder().schemaName("PUBLIC").tableName(TABLE_NAME).build();
 
@@ -940,15 +937,15 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertEquals(0, schema.id());
         assertEquals(CatalogService.PUBLIC, schema.name());
-        assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+        assertSame(schema, service.activeSchema(clock.nowLong()));
 
         assertNull(schema.table(TABLE_NAME));
-        assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
-        assertNull(service.table(2, System.currentTimeMillis()));
+        assertNull(service.table(TABLE_NAME, clock.nowLong()));
+        assertNull(service.table(2, clock.nowLong()));
 
         assertNull(schema.index(INDEX_NAME));
-        assertNull(service.index(INDEX_NAME, System.currentTimeMillis()));
-        assertNull(service.index(3, System.currentTimeMillis()));
+        assertNull(service.index(INDEX_NAME, clock.nowLong()));
+        assertNull(service.index(3, clock.nowLong()));
     }
 
     @Test
@@ -975,9 +972,9 @@ public class CatalogServiceSelfTest {
         schema = service.schema(2);
 
         assertNotNull(schema);
-        assertNull(service.index(1, System.currentTimeMillis()));
-        assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis()));
-        assertSame(schema.index(INDEX_NAME), service.index(3, System.currentTimeMillis()));
+        assertNull(service.index(1, clock.nowLong()));
+        assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, clock.nowLong()));
+        assertSame(schema.index(INDEX_NAME), service.index(3, clock.nowLong()));
 
         // Validate newly created hash index
         CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) schema.index(INDEX_NAME);
@@ -1016,9 +1013,9 @@ public class CatalogServiceSelfTest {
         schema = service.schema(2);
 
         assertNotNull(schema);
-        assertNull(service.index(1, System.currentTimeMillis()));
-        assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis()));
-        assertSame(schema.index(INDEX_NAME), service.index(3, System.currentTimeMillis()));
+        assertNull(service.index(1, clock.nowLong()));
+        assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, clock.nowLong()));
+        assertSame(schema.index(INDEX_NAME), service.index(3, clock.nowLong()));
 
         // Validate newly created sorted index
         CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) schema.index(INDEX_NAME);
@@ -1056,7 +1053,7 @@ public class CatalogServiceSelfTest {
 
         doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock);
+        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, clock);
         service.start();
 
         when(updateLogMock.append(any())).thenAnswer(invocation -> {
@@ -1067,6 +1064,7 @@ public class CatalogServiceSelfTest {
 
             VersionedUpdate update = new VersionedUpdate(
                     updateFromInvocation.version(),
+                    updateFromInvocation.activationTimestamp(),
                     List.of(new ObjectIdGenUpdateEntry(1))
             );
 
@@ -1080,14 +1078,61 @@ public class CatalogServiceSelfTest {
         assertThat(createTableFut, willThrow(IgniteInternalException.class, "Max retry limit exceeded"));
 
         // retry limit is hardcoded at org.apache.ignite.internal.catalog.CatalogServiceImpl.MAX_RETRY_COUNT
-        Mockito.verify(updateLogMock, times(10)).append(any());
+        verify(updateLogMock, times(10)).append(any());
+    }
+
+    @Test
+    public void catalogActivationTime() throws Exception {
+        final int delayDuration = 3_000;
+
+        InMemoryVaultService vaultService = new InMemoryVaultService();
+        VaultManager vault = new VaultManager(vaultService);
+        StandaloneMetaStorageManager metaStorageManager = StandaloneMetaStorageManager.create(vault);
+        UpdateLog updateLogMock = Mockito.spy(new UpdateLogImpl(metaStorageManager, vault));
+        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, clock, delayDuration);
+
+        vault.start();
+        metaStorageManager.start();
+        service.start();
+
+        metaStorageManager.deployWatches();
+
+        try {
+            CreateTableParams params = CreateTableParams.builder()
+                    .schemaName(SCHEMA_NAME)
+                    .tableName(TABLE_NAME)
+                    .columns(List.of(
+                            ColumnParams.builder().name("key").type(ColumnType.INT32).build(),
+                            ColumnParams.builder().name("val").type(ColumnType.INT32).nullable(true).build()
+                    ))
+                    .primaryKeyColumns(List.of("key"))
+                    .build();
+
+            CompletableFuture<Void> fut = service.createTable(params);
+
+            verify(updateLogMock).append(any());
+            // TODO IGNITE-19400: recheck future completion guarantees
+            assertThat(fut, willBe((Object) null));
+
+            assertSame(service.schema(0), service.activeSchema(clock.nowLong()));
+            assertNull(service.table(TABLE_NAME, clock.nowLong()));
+
+            clock.update(clock.now().addPhysicalTime(delayDuration));
+
+            assertSame(service.schema(1), service.activeSchema(clock.nowLong()));
+            assertNotNull(service.table(TABLE_NAME, clock.nowLong()));
+        } finally {
+            service.stop();
+            metaStorageManager.stop();
+            vault.stop();
+        }
     }
 
     @Test
     public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
         UpdateLog updateLogMock = Mockito.mock(UpdateLog.class);
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock);
+        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, Mockito.mock(HybridClock.class));
 
         service.start();
 
@@ -1196,9 +1241,7 @@ public class CatalogServiceSelfTest {
                 .filter("expression")
                 .build();
 
-        CompletableFuture<Void> fut = service.createDistributionZone(params);
-
-        assertThat(fut, willBe((Object) null));
+        assertThat(service.createDistributionZone(params), willBe((Object) null));
 
         // Validate catalog version from the past.
         assertNull(service.zone(ZONE_NAME, 0));
@@ -1207,10 +1250,10 @@ public class CatalogServiceSelfTest {
         assertNull(service.zone(2, 123L));
 
         // Validate actual catalog
-        CatalogZoneDescriptor zone = service.zone(ZONE_NAME, System.currentTimeMillis());
+        CatalogZoneDescriptor zone = service.zone(ZONE_NAME, clock.nowLong());
 
         assertNotNull(zone);
-        assertSame(zone, service.zone(2, System.currentTimeMillis()));
+        assertSame(zone, service.zone(2, clock.nowLong()));
 
         // Validate newly created zone
         assertEquals(2L, zone.id());
@@ -1224,16 +1267,14 @@ public class CatalogServiceSelfTest {
     }
 
     @Test
-    public void testDropZone() throws InterruptedException {
+    public void testDropZone() {
         CreateZoneParams createZoneParams = CreateZoneParams.builder()
                 .zoneName(ZONE_NAME)
                 .build();
 
         assertThat(service.createDistributionZone(createZoneParams), willBe((Object) null));
 
-        long beforeDropTimestamp = System.currentTimeMillis();
-
-        Thread.sleep(5);
+        long beforeDropTimestamp = clock.nowLong();
 
         DropZoneParams params = DropZoneParams.builder()
                 .zoneName(ZONE_NAME)
@@ -1253,8 +1294,8 @@ public class CatalogServiceSelfTest {
         assertSame(zone, service.zone(2, beforeDropTimestamp));
 
         // Validate actual catalog
-        assertNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
-        assertNull(service.zone(2, System.currentTimeMillis()));
+        assertNull(service.zone(ZONE_NAME, clock.nowLong()));
+        assertNull(service.zone(2, clock.nowLong()));
 
         // Try to drop non-existing zone.
         assertThat(service.dropDistributionZone(params), willThrow(DistributionZoneNotFoundException.class));
@@ -1275,7 +1316,7 @@ public class CatalogServiceSelfTest {
 
         assertThat(service.createDistributionZone(createParams), willBe((Object) null));
 
-        long beforeDropTimestamp = System.currentTimeMillis();
+        long beforeDropTimestamp = clock.nowLong();
 
         Thread.sleep(5);
 
@@ -1291,19 +1332,19 @@ public class CatalogServiceSelfTest {
         assertSame(zone, service.zone(2, beforeDropTimestamp));
 
         // Validate actual catalog
-        zone = service.zone("RenamedZone", System.currentTimeMillis());
+        zone = service.zone("RenamedZone", clock.nowLong());
 
         assertNotNull(zone);
-        assertNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
+        assertNull(service.zone(ZONE_NAME, clock.nowLong()));
         assertEquals("RenamedZone", zone.name());
         assertEquals(2, zone.id());
 
-        assertSame(zone, service.zone(2, System.currentTimeMillis()));
+        assertSame(zone, service.zone(2, clock.nowLong()));
     }
 
     @Test
     public void testDefaultZone() {
-        CatalogZoneDescriptor defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis());
+        CatalogZoneDescriptor defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong());
 
         // Try to create zone with default zone name.
         CreateZoneParams createParams = CreateZoneParams.builder()
@@ -1314,7 +1355,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.createDistributionZone(createParams), willThrow(IgniteInternalException.class));
 
         // Validate default zone wasn't changed.
-        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
 
         // Try to rename default zone.
         RenameZoneParams renameZoneParams = RenameZoneParams.builder()
@@ -1324,8 +1365,8 @@ public class CatalogServiceSelfTest {
         assertThat(service.renameDistributionZone(renameZoneParams), willThrow(IgniteInternalException.class));
 
         // Validate default zone wasn't changed.
-        assertNull(service.zone("RenamedDefaultZone", System.currentTimeMillis()));
-        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+        assertNull(service.zone("RenamedDefaultZone", clock.nowLong()));
+        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
 
         // Try to drop default zone.
         DropZoneParams dropZoneParams = DropZoneParams.builder()
@@ -1334,7 +1375,7 @@ public class CatalogServiceSelfTest {
         assertThat(service.dropDistributionZone(dropZoneParams), willThrow(IgniteInternalException.class));
 
         // Validate default zone wasn't changed.
-        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
 
         // Try to rename to a zone with default name.
         createParams = CreateZoneParams.builder()
@@ -1348,12 +1389,12 @@ public class CatalogServiceSelfTest {
                 .build();
 
         assertThat(service.createDistributionZone(createParams), willBe((Object) null));
-        defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis());
+        defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong());
 
         assertThat(service.renameDistributionZone(renameZoneParams), willThrow(DistributionZoneAlreadyExistsException.class));
 
         // Validate default zone wasn't changed.
-        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+        assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
     }
 
     @Test
@@ -1379,9 +1420,9 @@ public class CatalogServiceSelfTest {
         assertThat(service.alterDistributionZone(alterZoneParams), willBe((Object) null));
 
         // Validate actual catalog
-        CatalogZoneDescriptor zone = service.zone(ZONE_NAME, System.currentTimeMillis());
+        CatalogZoneDescriptor zone = service.zone(ZONE_NAME, clock.nowLong());
         assertNotNull(zone);
-        assertSame(zone, service.zone(2, System.currentTimeMillis()));
+        assertSame(zone, service.zone(2, clock.nowLong()));
 
         assertEquals(ZONE_NAME, zone.name());
         assertEquals(2, zone.id());
@@ -1413,11 +1454,11 @@ public class CatalogServiceSelfTest {
         assertThat(service.createDistributionZone(params), willThrowFast(DistributionZoneAlreadyExistsException.class));
 
         // Validate zone was NOT changed
-        CatalogZoneDescriptor zone = service.zone(ZONE_NAME, System.currentTimeMillis());
+        CatalogZoneDescriptor zone = service.zone(ZONE_NAME, clock.nowLong());
 
         assertNotNull(zone);
-        assertSame(zone, service.zone(2, System.currentTimeMillis()));
-        assertNull(service.zone(3, System.currentTimeMillis()));
+        assertSame(zone, service.zone(2, clock.nowLong()));
+        assertNull(service.zone(3, clock.nowLong()));
 
         assertEquals(2L, zone.id());
         assertEquals(ZONE_NAME, zone.name());
@@ -1425,31 +1466,6 @@ public class CatalogServiceSelfTest {
         assertEquals(15, zone.replicas());
     }
 
-    @Test
-    public void testDropZoneIfExistsFlag() {
-        CreateZoneParams createZoneParams = CreateZoneParams.builder()
-                .zoneName(ZONE_NAME)
-                .build();
-
-        assertThat(service.createDistributionZone(createZoneParams), willBe((Object) null));
-
-        assertNotNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
-        assertNotNull(service.zone(1, System.currentTimeMillis()));
-
-        DropZoneParams params = DropZoneParams.builder()
-                .zoneName(ZONE_NAME)
-                .build();
-
-        assertThat(service.dropDistributionZone(params), willBe((Object) null));
-
-        // Drop non-existing zone.
-        assertThat(service.dropDistributionZone(params), willThrowFast(DistributionZoneNotFoundException.class));
-
-        // Validate actual catalog
-        assertNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
-        assertNull(service.zone(2, System.currentTimeMillis()));
-    }
-
     @Test
     public void testCreateZoneEvents() {
         CreateZoneParams createZoneParams = CreateZoneParams.builder()
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 375fcdd4e0..e41ba4eb53 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -82,8 +82,8 @@ class UpdateLogImplTest {
         metastore.deployWatches();
 
         List<VersionedUpdate> expectedLog = List.of(
-                new VersionedUpdate(1, List.of(new TestUpdateEntry("foo"))),
-                new VersionedUpdate(2, List.of(new TestUpdateEntry("bar")))
+                new VersionedUpdate(1, 1L, List.of(new TestUpdateEntry("foo"))),
+                new VersionedUpdate(2, 2L, List.of(new TestUpdateEntry("bar")))
         );
 
         for (VersionedUpdate update : expectedLog) {
@@ -172,7 +172,7 @@ class UpdateLogImplTest {
     }
 
     private static VersionedUpdate singleEntryUpdateOfVersion(int version) {
-        return new VersionedUpdate(version, List.of(new TestUpdateEntry("foo_" + version)));
+        return new VersionedUpdate(version, version, List.of(new TestUpdateEntry("foo_" + version)));
     }
 
     static class TestUpdateEntry implements UpdateEntry {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index b414e06169..1eb85a25c5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -743,7 +743,7 @@ public class ItRebalanceDistributedTest {
                     metaStorageManager,
                     clusterService);
 
-            catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageManager, vaultManager));
+            catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageManager, vaultManager), hybridClock);
 
             schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager);
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index ab485aa530..b43e47caa5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -396,7 +396,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
                 new RaftGroupEventsClientListener()
         );
 
-        var catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vault));
+        var catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vault), hybridClock);
 
         TableManager tableManager = new TableManager(
                 name,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 2cd7c4e420..d937915332 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -481,7 +481,7 @@ public class IgniteImpl implements Ignite {
 
         outgoingSnapshotsManager = new OutgoingSnapshotsManager(clusterSvc.messagingService());
 
-        catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vaultMgr));
+        catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vaultMgr), clock);
 
         distributedTblMgr = new TableManager(
                 name,