You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/06/09 14:27:57 UTC
[ignite-3] branch main updated: IGNITE-19642 CatalogService should use HybridClock to register new Catalog version (#2154)
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 85b18910ee IGNITE-19642 CatalogService should use HybridClock to register new Catalog version (#2154)
85b18910ee is described below
commit 85b18910ee83dc52c05277997d24408741a15e78
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Fri Jun 9 17:27:51 2023 +0300
IGNITE-19642 CatalogService should use HybridClock to register new Catalog version (#2154)
---
.../internal/catalog/CatalogServiceImpl.java | 66 ++++--
.../internal/catalog/storage/VersionedUpdate.java | 11 +-
.../internal/catalog/CatalogServiceSelfTest.java | 236 +++++++++++----------
.../catalog/storage/UpdateLogImplTest.java | 6 +-
.../storage/ItRebalanceDistributedTest.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
7 files changed, 186 insertions(+), 139 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 8b3acbcd25..133904021e 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.catalog.storage.UpdateEntry;
import org.apache.ignite.internal.catalog.storage.UpdateLog;
import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
+import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.Producer;
@@ -112,6 +113,9 @@ import org.jetbrains.annotations.Nullable;
public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParameters> implements CatalogManager {
private static final int MAX_RETRY_COUNT = 10;
+ /** Safe time to wait before new Catalog version activation. */
+ private static final int DEFAULT_DELAY_DURATION = 0;
+
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(CatalogServiceImpl.class);
@@ -125,11 +129,24 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
+ private final HybridClock clock;
+
+ private final long delayDurationMs;
+
+ /**
+ * Constructor.
+ */
+ public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) {
+ this(updateLog, clock, DEFAULT_DELAY_DURATION);
+ }
+
/**
* Constructor.
*/
- public CatalogServiceImpl(UpdateLog updateLog) {
+ public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock, long delayDurationMs) {
this.updateLog = updateLog;
+ this.clock = clock;
+ this.delayDurationMs = delayDurationMs;
}
/** {@inheritDoc} */
@@ -767,8 +784,10 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
}
int newVersion = catalog.version() + 1;
+ //TODO https://issues.apache.org/jira/browse/IGNITE-19209 Make activation time in the MS entry strictly equal to MS entry ts+DD
+ long activationTimestamp = activationTimestamp();
- return updateLog.append(new VersionedUpdate(newVersion, updates))
+ return updateLog.append(new VersionedUpdate(newVersion, activationTimestamp, updates))
.thenCompose(result -> versionTracker.waitFor(newVersion).thenApply(none -> result))
.thenCompose(result -> {
if (result) {
@@ -783,6 +802,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
@Override
public void handle(VersionedUpdate update) {
int version = update.version();
+ long activationTimestamp = update.activationTimestamp();
Catalog catalog = catalogByVer.get(version - 1);
assert catalog != null;
@@ -796,7 +816,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
if (entry instanceof NewTableEntry) {
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
List.of(new CatalogSchemaDescriptor(
@@ -817,7 +837,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
List.of(new CatalogSchemaDescriptor(
@@ -838,7 +858,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
List.of(new CatalogSchemaDescriptor(
@@ -869,7 +889,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
List.of(new CatalogSchemaDescriptor(
@@ -898,7 +918,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
} else if (entry instanceof NewIndexEntry) {
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
List.of(new CatalogSchemaDescriptor(
@@ -918,7 +938,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
List.of(new CatalogSchemaDescriptor(
@@ -936,7 +956,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
} else if (entry instanceof NewZoneEntry) {
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
CollectionUtils.concat(catalog.zones(), List.of(((NewZoneEntry) entry).descriptor())),
catalog.schemas()
@@ -951,7 +971,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones().stream().filter(z -> z.id() != zoneId).collect(Collectors.toList()),
catalog.schemas()
@@ -966,7 +986,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones().stream()
.map(z -> z.id() == descriptor.id() ? descriptor : z)
@@ -981,15 +1001,10 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
} else if (entry instanceof ObjectIdGenUpdateEntry) {
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState() + ((ObjectIdGenUpdateEntry) entry).delta(),
catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- schema.tables(),
- schema.indexes()
- ))
+ catalog.schemas()
);
} else if (entry instanceof AlterColumnEntry) {
int tableId = ((AlterColumnEntry) entry).tableId();
@@ -997,7 +1012,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
catalog = new Catalog(
version,
- System.currentTimeMillis(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
List.of(new CatalogSchemaDescriptor(
@@ -1033,17 +1048,24 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
registerCatalog(catalog);
CompletableFuture.allOf(eventFutures.toArray(CompletableFuture[]::new))
- .thenRun(() -> versionTracker.update(version, null))
.whenComplete((ignore, err) -> {
if (err != null) {
LOG.warn("Failed to apply catalog update.", err);
- } else {
- versionTracker.update(version, null);
+ //TODO: IGNITE-14611 Pass exception to an error handler because catalog got into inconsistent state.
}
+
+ versionTracker.update(version, null);
});
}
}
+ /**
+ * Calculate catalog activation timestamp.
+ */
+ private long activationTimestamp() {
+ return clock.now().addPhysicalTime(delayDurationMs).longValue();
+ }
+
private static void throwUnsupportedDdl(String msg, Object... params) {
throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, IgniteStringFormatter.format(msg, params));
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
index 195c19c760..437534ccf9 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
@@ -31,6 +31,8 @@ public class VersionedUpdate implements Serializable {
private final int version;
+ private final long activationTimestamp;
+
@IgniteToStringInclude
private final List<UpdateEntry> entries;
@@ -38,10 +40,12 @@ public class VersionedUpdate implements Serializable {
* Constructs the object.
*
* @param version A version the changes relate to.
+ * @param activationTimestamp Timestamp given changes become active at.
* @param entries A list of changes.
*/
- public VersionedUpdate(int version, List<UpdateEntry> entries) {
+ public VersionedUpdate(int version, long activationTimestamp, List<UpdateEntry> entries) {
this.version = version;
+ this.activationTimestamp = activationTimestamp;
this.entries = List.copyOf(
Objects.requireNonNull(entries, "entries")
);
@@ -52,6 +56,11 @@ public class VersionedUpdate implements Serializable {
return version;
}
+ /** Returns activation timestamp. */
+ public long activationTimestamp() {
+ return activationTimestamp;
+ }
+
/** Returns list of changes. */
public List<UpdateEntry> entries() {
return entries;
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 91ffa02264..2d09e3f26c 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -83,6 +83,8 @@ import org.apache.ignite.internal.catalog.storage.UpdateLog;
import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
@@ -131,6 +133,8 @@ public class CatalogServiceSelfTest {
private CatalogServiceImpl service;
+ private HybridClock clock;
+
@BeforeEach
void setUp() throws NodeStoppingException {
vault = new VaultManager(new InMemoryVaultService());
@@ -139,7 +143,8 @@ public class CatalogServiceSelfTest {
vault, new SimpleInMemoryKeyValueStorage("test")
);
- service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault));
+ clock = new HybridClockImpl();
+ service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault), clock, 0L);
vault.start();
metastore.start();
@@ -157,14 +162,14 @@ public class CatalogServiceSelfTest {
@Test
public void testEmptyCatalog() {
- assertNotNull(service.activeSchema(System.currentTimeMillis()));
+ assertNotNull(service.activeSchema(clock.nowLong()));
assertNotNull(service.schema(0));
assertNull(service.schema(1));
assertThrows(IllegalStateException.class, () -> service.activeSchema(-1L));
- assertNull(service.table(0, System.currentTimeMillis()));
- assertNull(service.index(0, System.currentTimeMillis()));
+ assertNull(service.table(0, clock.nowLong()));
+ assertNull(service.index(0, clock.nowLong()));
CatalogSchemaDescriptor schema = service.schema(0);
assertEquals(SCHEMA_NAME, schema.name());
@@ -173,7 +178,7 @@ public class CatalogServiceSelfTest {
assertEquals(0, schema.tables().length);
assertEquals(0, schema.indexes().length);
- CatalogZoneDescriptor zone = service.zone(1, System.currentTimeMillis());
+ CatalogZoneDescriptor zone = service.zone(1, clock.nowLong());
assertEquals(CatalogService.DEFAULT_ZONE_NAME, zone.name());
assertEquals(1, zone.id());
@@ -223,10 +228,10 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+ assertSame(schema, service.activeSchema(clock.nowLong()));
- assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis()));
- assertSame(schema.table(TABLE_NAME), service.table(2, System.currentTimeMillis()));
+ assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, clock.nowLong()));
+ assertSame(schema.table(TABLE_NAME), service.table(2, clock.nowLong()));
// Validate newly created table
CatalogTableDescriptor table = schema.table(TABLE_NAME);
@@ -245,13 +250,13 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+ assertSame(schema, service.activeSchema(clock.nowLong()));
- assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis()));
- assertSame(schema.table(TABLE_NAME), service.table(2, System.currentTimeMillis()));
+ assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, clock.nowLong()));
+ assertSame(schema.table(TABLE_NAME), service.table(2, clock.nowLong()));
- assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, System.currentTimeMillis()));
- assertSame(schema.table(TABLE_NAME_2), service.table(3, System.currentTimeMillis()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, clock.nowLong()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(3, clock.nowLong()));
assertNotSame(schema.table(TABLE_NAME), schema.table(TABLE_NAME_2));
@@ -259,17 +264,15 @@ public class CatalogServiceSelfTest {
assertThat(service.createTable(simpleTable(TABLE_NAME_2)), willThrowFast(TableAlreadyExistsException.class));
// Validate schema wasn't changed.
- assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+ assertSame(schema, service.activeSchema(clock.nowLong()));
}
@Test
- public void testDropTable() throws InterruptedException {
+ public void testDropTable() {
assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
assertThat(service.createTable(simpleTable(TABLE_NAME_2)), willBe((Object) null));
- long beforeDropTimestamp = System.currentTimeMillis();
-
- Thread.sleep(5);
+ long beforeDropTimestamp = clock.nowLong();
DropTableParams dropTableParams = DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
@@ -295,24 +298,24 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+ assertSame(schema, service.activeSchema(clock.nowLong()));
assertNull(schema.table(TABLE_NAME));
- assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
- assertNull(service.table(2, System.currentTimeMillis()));
+ assertNull(service.table(TABLE_NAME, clock.nowLong()));
+ assertNull(service.table(2, clock.nowLong()));
- assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, System.currentTimeMillis()));
- assertSame(schema.table(TABLE_NAME_2), service.table(3, System.currentTimeMillis()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, clock.nowLong()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(3, clock.nowLong()));
// Try to drop table once again.
assertThat(service.dropTable(dropTableParams), willThrowFast(TableNotFoundException.class));
// Validate schema wasn't changed.
- assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+ assertSame(schema, service.activeSchema(clock.nowLong()));
}
@Test
- public void testAddColumn() throws InterruptedException {
+ public void testAddColumn() {
assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
AlterTableAddColumnParams params = AlterTableAddColumnParams.builder()
@@ -327,9 +330,7 @@ public class CatalogServiceSelfTest {
))
.build();
- long beforeAddedTimestamp = System.currentTimeMillis();
-
- Thread.sleep(5);
+ long beforeAddedTimestamp = clock.nowLong();
assertThat(service.addColumn(params), willBe((Object) null));
@@ -341,7 +342,7 @@ public class CatalogServiceSelfTest {
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
// Validate actual catalog
- schema = service.activeSchema(System.currentTimeMillis());
+ schema = service.activeSchema(clock.nowLong());
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
@@ -361,7 +362,7 @@ public class CatalogServiceSelfTest {
}
@Test
- public void testDropColumn() throws InterruptedException {
+ public void testDropColumn() {
assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
// Validate dropping column
@@ -371,9 +372,7 @@ public class CatalogServiceSelfTest {
.columns(Set.of("VAL"))
.build();
- long beforeAddedTimestamp = System.currentTimeMillis();
-
- Thread.sleep(5);
+ long beforeAddedTimestamp = clock.nowLong();
assertThat(service.dropColumn(params), willBe((Object) null));
@@ -385,7 +384,7 @@ public class CatalogServiceSelfTest {
assertNotNull(schema.table(TABLE_NAME).column("VAL"));
// Validate actual catalog
- schema = service.activeSchema(System.currentTimeMillis());
+ schema = service.activeSchema(clock.nowLong());
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
@@ -394,7 +393,7 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateDropColumnIfTableNotExists() {
- assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
+ assertNull(service.table(TABLE_NAME, clock.nowLong()));
// Try to add a new column.
AlterTableAddColumnParams addColumnParams = AlterTableAddColumnParams.builder()
@@ -439,7 +438,7 @@ public class CatalogServiceSelfTest {
assertThat(service.dropColumn(params), willThrow(SqlException.class));
// Validate actual catalog
- CatalogSchemaDescriptor schema = service.activeSchema(System.currentTimeMillis());
+ CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
assertSame(service.schema(2), schema);
@@ -465,7 +464,7 @@ public class CatalogServiceSelfTest {
assertThat(service.addColumn(addColumnParams), willThrow(ColumnAlreadyExistsException.class));
// Validate no column added.
- CatalogSchemaDescriptor schema = service.activeSchema(System.currentTimeMillis());
+ CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
@@ -482,7 +481,7 @@ public class CatalogServiceSelfTest {
assertThat(service.addColumn(addColumnParams), willBe((Object) null));
// Validate both columns added.
- schema = service.activeSchema(System.currentTimeMillis());
+ schema = service.activeSchema(clock.nowLong());
assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -497,7 +496,7 @@ public class CatalogServiceSelfTest {
assertThat(service.dropColumn(dropColumnParams), willBe((Object) null));
// Validate both columns dropped.
- schema = service.activeSchema(System.currentTimeMillis());
+ schema = service.activeSchema(clock.nowLong());
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -512,7 +511,7 @@ public class CatalogServiceSelfTest {
assertThat(service.dropColumn(dropColumnParams), willThrow(ColumnNotFoundException.class));
// Validate no column dropped.
- schema = service.activeSchema(System.currentTimeMillis());
+ schema = service.activeSchema(clock.nowLong());
assertNotNull(schema.table(TABLE_NAME).column("VAL"));
}
@@ -902,7 +901,7 @@ public class CatalogServiceSelfTest {
}
@Test
- public void testDropTableWithIndex() throws InterruptedException {
+ public void testDropTableWithIndex() {
CreateHashIndexParams params = CreateHashIndexParams.builder()
.indexName(INDEX_NAME)
.tableName(TABLE_NAME)
@@ -912,9 +911,7 @@ public class CatalogServiceSelfTest {
assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
assertThat(service.createIndex(params), willBe((Object) null));
- long beforeDropTimestamp = System.currentTimeMillis();
-
- Thread.sleep(5);
+ long beforeDropTimestamp = clock.nowLong();
DropTableParams dropTableParams = DropTableParams.builder().schemaName("PUBLIC").tableName(TABLE_NAME).build();
@@ -940,15 +937,15 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
assertEquals(CatalogService.PUBLIC, schema.name());
- assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+ assertSame(schema, service.activeSchema(clock.nowLong()));
assertNull(schema.table(TABLE_NAME));
- assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
- assertNull(service.table(2, System.currentTimeMillis()));
+ assertNull(service.table(TABLE_NAME, clock.nowLong()));
+ assertNull(service.table(2, clock.nowLong()));
assertNull(schema.index(INDEX_NAME));
- assertNull(service.index(INDEX_NAME, System.currentTimeMillis()));
- assertNull(service.index(3, System.currentTimeMillis()));
+ assertNull(service.index(INDEX_NAME, clock.nowLong()));
+ assertNull(service.index(3, clock.nowLong()));
}
@Test
@@ -975,9 +972,9 @@ public class CatalogServiceSelfTest {
schema = service.schema(2);
assertNotNull(schema);
- assertNull(service.index(1, System.currentTimeMillis()));
- assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis()));
- assertSame(schema.index(INDEX_NAME), service.index(3, System.currentTimeMillis()));
+ assertNull(service.index(1, clock.nowLong()));
+ assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, clock.nowLong()));
+ assertSame(schema.index(INDEX_NAME), service.index(3, clock.nowLong()));
// Validate newly created hash index
CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) schema.index(INDEX_NAME);
@@ -1016,9 +1013,9 @@ public class CatalogServiceSelfTest {
schema = service.schema(2);
assertNotNull(schema);
- assertNull(service.index(1, System.currentTimeMillis()));
- assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis()));
- assertSame(schema.index(INDEX_NAME), service.index(3, System.currentTimeMillis()));
+ assertNull(service.index(1, clock.nowLong()));
+ assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, clock.nowLong()));
+ assertSame(schema.index(INDEX_NAME), service.index(3, clock.nowLong()));
// Validate newly created sorted index
CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) schema.index(INDEX_NAME);
@@ -1056,7 +1053,7 @@ public class CatalogServiceSelfTest {
doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
- CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock);
+ CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, clock);
service.start();
when(updateLogMock.append(any())).thenAnswer(invocation -> {
@@ -1067,6 +1064,7 @@ public class CatalogServiceSelfTest {
VersionedUpdate update = new VersionedUpdate(
updateFromInvocation.version(),
+ updateFromInvocation.activationTimestamp(),
List.of(new ObjectIdGenUpdateEntry(1))
);
@@ -1080,14 +1078,61 @@ public class CatalogServiceSelfTest {
assertThat(createTableFut, willThrow(IgniteInternalException.class, "Max retry limit exceeded"));
// retry limit is hardcoded at org.apache.ignite.internal.catalog.CatalogServiceImpl.MAX_RETRY_COUNT
- Mockito.verify(updateLogMock, times(10)).append(any());
+ verify(updateLogMock, times(10)).append(any());
+ }
+
+ @Test
+ public void catalogActivationTime() throws Exception {
+ final int delayDuration = 3_000;
+
+ InMemoryVaultService vaultService = new InMemoryVaultService();
+ VaultManager vault = new VaultManager(vaultService);
+ StandaloneMetaStorageManager metaStorageManager = StandaloneMetaStorageManager.create(vault);
+ UpdateLog updateLogMock = Mockito.spy(new UpdateLogImpl(metaStorageManager, vault));
+ CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, clock, delayDuration);
+
+ vault.start();
+ metaStorageManager.start();
+ service.start();
+
+ metaStorageManager.deployWatches();
+
+ try {
+ CreateTableParams params = CreateTableParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .tableName(TABLE_NAME)
+ .columns(List.of(
+ ColumnParams.builder().name("key").type(ColumnType.INT32).build(),
+ ColumnParams.builder().name("val").type(ColumnType.INT32).nullable(true).build()
+ ))
+ .primaryKeyColumns(List.of("key"))
+ .build();
+
+ CompletableFuture<Void> fut = service.createTable(params);
+
+ verify(updateLogMock).append(any());
+ // TODO IGNITE-19400: recheck future completion guarantees
+ assertThat(fut, willBe((Object) null));
+
+ assertSame(service.schema(0), service.activeSchema(clock.nowLong()));
+ assertNull(service.table(TABLE_NAME, clock.nowLong()));
+
+ clock.update(clock.now().addPhysicalTime(delayDuration));
+
+ assertSame(service.schema(1), service.activeSchema(clock.nowLong()));
+ assertNotNull(service.table(TABLE_NAME, clock.nowLong()));
+ } finally {
+ service.stop();
+ metaStorageManager.stop();
+ vault.stop();
+ }
}
@Test
public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
UpdateLog updateLogMock = Mockito.mock(UpdateLog.class);
- CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock);
+ CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, Mockito.mock(HybridClock.class));
service.start();
@@ -1196,9 +1241,7 @@ public class CatalogServiceSelfTest {
.filter("expression")
.build();
- CompletableFuture<Void> fut = service.createDistributionZone(params);
-
- assertThat(fut, willBe((Object) null));
+ assertThat(service.createDistributionZone(params), willBe((Object) null));
// Validate catalog version from the past.
assertNull(service.zone(ZONE_NAME, 0));
@@ -1207,10 +1250,10 @@ public class CatalogServiceSelfTest {
assertNull(service.zone(2, 123L));
// Validate actual catalog
- CatalogZoneDescriptor zone = service.zone(ZONE_NAME, System.currentTimeMillis());
+ CatalogZoneDescriptor zone = service.zone(ZONE_NAME, clock.nowLong());
assertNotNull(zone);
- assertSame(zone, service.zone(2, System.currentTimeMillis()));
+ assertSame(zone, service.zone(2, clock.nowLong()));
// Validate newly created zone
assertEquals(2L, zone.id());
@@ -1224,16 +1267,14 @@ public class CatalogServiceSelfTest {
}
@Test
- public void testDropZone() throws InterruptedException {
+ public void testDropZone() {
CreateZoneParams createZoneParams = CreateZoneParams.builder()
.zoneName(ZONE_NAME)
.build();
assertThat(service.createDistributionZone(createZoneParams), willBe((Object) null));
- long beforeDropTimestamp = System.currentTimeMillis();
-
- Thread.sleep(5);
+ long beforeDropTimestamp = clock.nowLong();
DropZoneParams params = DropZoneParams.builder()
.zoneName(ZONE_NAME)
@@ -1253,8 +1294,8 @@ public class CatalogServiceSelfTest {
assertSame(zone, service.zone(2, beforeDropTimestamp));
// Validate actual catalog
- assertNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
- assertNull(service.zone(2, System.currentTimeMillis()));
+ assertNull(service.zone(ZONE_NAME, clock.nowLong()));
+ assertNull(service.zone(2, clock.nowLong()));
// Try to drop non-existing zone.
assertThat(service.dropDistributionZone(params), willThrow(DistributionZoneNotFoundException.class));
@@ -1275,7 +1316,7 @@ public class CatalogServiceSelfTest {
assertThat(service.createDistributionZone(createParams), willBe((Object) null));
- long beforeDropTimestamp = System.currentTimeMillis();
+ long beforeDropTimestamp = clock.nowLong();
Thread.sleep(5);
@@ -1291,19 +1332,19 @@ public class CatalogServiceSelfTest {
assertSame(zone, service.zone(2, beforeDropTimestamp));
// Validate actual catalog
- zone = service.zone("RenamedZone", System.currentTimeMillis());
+ zone = service.zone("RenamedZone", clock.nowLong());
assertNotNull(zone);
- assertNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
+ assertNull(service.zone(ZONE_NAME, clock.nowLong()));
assertEquals("RenamedZone", zone.name());
assertEquals(2, zone.id());
- assertSame(zone, service.zone(2, System.currentTimeMillis()));
+ assertSame(zone, service.zone(2, clock.nowLong()));
}
@Test
public void testDefaultZone() {
- CatalogZoneDescriptor defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis());
+ CatalogZoneDescriptor defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong());
// Try to create zone with default zone name.
CreateZoneParams createParams = CreateZoneParams.builder()
@@ -1314,7 +1355,7 @@ public class CatalogServiceSelfTest {
assertThat(service.createDistributionZone(createParams), willThrow(IgniteInternalException.class));
// Validate default zone wasn't changed.
- assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+ assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
// Try to rename default zone.
RenameZoneParams renameZoneParams = RenameZoneParams.builder()
@@ -1324,8 +1365,8 @@ public class CatalogServiceSelfTest {
assertThat(service.renameDistributionZone(renameZoneParams), willThrow(IgniteInternalException.class));
// Validate default zone wasn't changed.
- assertNull(service.zone("RenamedDefaultZone", System.currentTimeMillis()));
- assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+ assertNull(service.zone("RenamedDefaultZone", clock.nowLong()));
+ assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
// Try to drop default zone.
DropZoneParams dropZoneParams = DropZoneParams.builder()
@@ -1334,7 +1375,7 @@ public class CatalogServiceSelfTest {
assertThat(service.dropDistributionZone(dropZoneParams), willThrow(IgniteInternalException.class));
// Validate default zone wasn't changed.
- assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+ assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
// Try to rename to a zone with default name.
createParams = CreateZoneParams.builder()
@@ -1348,12 +1389,12 @@ public class CatalogServiceSelfTest {
.build();
assertThat(service.createDistributionZone(createParams), willBe((Object) null));
- defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis());
+ defaultZone = service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong());
assertThat(service.renameDistributionZone(renameZoneParams), willThrow(DistributionZoneAlreadyExistsException.class));
// Validate default zone wasn't changed.
- assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, System.currentTimeMillis()));
+ assertSame(defaultZone, service.zone(CatalogService.DEFAULT_ZONE_NAME, clock.nowLong()));
}
@Test
@@ -1379,9 +1420,9 @@ public class CatalogServiceSelfTest {
assertThat(service.alterDistributionZone(alterZoneParams), willBe((Object) null));
// Validate actual catalog
- CatalogZoneDescriptor zone = service.zone(ZONE_NAME, System.currentTimeMillis());
+ CatalogZoneDescriptor zone = service.zone(ZONE_NAME, clock.nowLong());
assertNotNull(zone);
- assertSame(zone, service.zone(2, System.currentTimeMillis()));
+ assertSame(zone, service.zone(2, clock.nowLong()));
assertEquals(ZONE_NAME, zone.name());
assertEquals(2, zone.id());
@@ -1413,11 +1454,11 @@ public class CatalogServiceSelfTest {
assertThat(service.createDistributionZone(params), willThrowFast(DistributionZoneAlreadyExistsException.class));
// Validate zone was NOT changed
- CatalogZoneDescriptor zone = service.zone(ZONE_NAME, System.currentTimeMillis());
+ CatalogZoneDescriptor zone = service.zone(ZONE_NAME, clock.nowLong());
assertNotNull(zone);
- assertSame(zone, service.zone(2, System.currentTimeMillis()));
- assertNull(service.zone(3, System.currentTimeMillis()));
+ assertSame(zone, service.zone(2, clock.nowLong()));
+ assertNull(service.zone(3, clock.nowLong()));
assertEquals(2L, zone.id());
assertEquals(ZONE_NAME, zone.name());
@@ -1425,31 +1466,6 @@ public class CatalogServiceSelfTest {
assertEquals(15, zone.replicas());
}
- @Test
- public void testDropZoneIfExistsFlag() {
- CreateZoneParams createZoneParams = CreateZoneParams.builder()
- .zoneName(ZONE_NAME)
- .build();
-
- assertThat(service.createDistributionZone(createZoneParams), willBe((Object) null));
-
- assertNotNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
- assertNotNull(service.zone(1, System.currentTimeMillis()));
-
- DropZoneParams params = DropZoneParams.builder()
- .zoneName(ZONE_NAME)
- .build();
-
- assertThat(service.dropDistributionZone(params), willBe((Object) null));
-
- // Drop non-existing zone.
- assertThat(service.dropDistributionZone(params), willThrowFast(DistributionZoneNotFoundException.class));
-
- // Validate actual catalog
- assertNull(service.zone(ZONE_NAME, System.currentTimeMillis()));
- assertNull(service.zone(2, System.currentTimeMillis()));
- }
-
@Test
public void testCreateZoneEvents() {
CreateZoneParams createZoneParams = CreateZoneParams.builder()
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 375fcdd4e0..e41ba4eb53 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -82,8 +82,8 @@ class UpdateLogImplTest {
metastore.deployWatches();
List<VersionedUpdate> expectedLog = List.of(
- new VersionedUpdate(1, List.of(new TestUpdateEntry("foo"))),
- new VersionedUpdate(2, List.of(new TestUpdateEntry("bar")))
+ new VersionedUpdate(1, 1L, List.of(new TestUpdateEntry("foo"))),
+ new VersionedUpdate(2, 2L, List.of(new TestUpdateEntry("bar")))
);
for (VersionedUpdate update : expectedLog) {
@@ -172,7 +172,7 @@ class UpdateLogImplTest {
}
private static VersionedUpdate singleEntryUpdateOfVersion(int version) {
- return new VersionedUpdate(version, List.of(new TestUpdateEntry("foo_" + version)));
+ return new VersionedUpdate(version, version, List.of(new TestUpdateEntry("foo_" + version)));
}
static class TestUpdateEntry implements UpdateEntry {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index b414e06169..1eb85a25c5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -743,7 +743,7 @@ public class ItRebalanceDistributedTest {
metaStorageManager,
clusterService);
- catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageManager, vaultManager));
+ catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageManager, vaultManager), hybridClock);
schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index ab485aa530..b43e47caa5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -396,7 +396,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
new RaftGroupEventsClientListener()
);
- var catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vault));
+ var catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vault), hybridClock);
TableManager tableManager = new TableManager(
name,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 2cd7c4e420..d937915332 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -481,7 +481,7 @@ public class IgniteImpl implements Ignite {
outgoingSnapshotsManager = new OutgoingSnapshotsManager(clusterSvc.messagingService());
- catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vaultMgr));
+ catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vaultMgr), clock);
distributedTblMgr = new TableManager(
name,