You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/08/24 15:50:31 UTC
[ignite-3] 01/02: Plug SchemaSyncService in.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit cb5d1b2c45605533301274a02fbf9ed1e482a379
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Aug 24 17:02:43 2023 +0300
Plug SchemaSyncService in.
---
.../internal/catalog/CatalogManagerImpl.java | 11 +++-----
.../internal/catalog/CatalogManagerSelfTest.java | 4 +--
.../internal/catalog/BaseCatalogManagerTest.java | 2 +-
.../ignite/internal/catalog/CatalogTestUtils.java | 4 +--
.../testframework/TestIgnitionManager.java | 29 +++++++++++++++-------
.../ignite/internal/index/IndexManagerTest.java | 2 +-
.../storage/ItRebalanceDistributedTest.java | 18 +++++++++++---
...niteDistributionZoneManagerNodeRestartTest.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 17 +++++++++++--
.../org/apache/ignite/internal/app/IgniteImpl.java | 10 +++++++-
.../internal/table/distributed/TableManager.java | 7 +++++-
.../table/distributed/TableManagerTest.java | 4 ++-
12 files changed, 78 insertions(+), 32 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index a99dbd7543..06cc68ff65 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -107,6 +107,7 @@ import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Catalog service implementation.
@@ -137,14 +138,8 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam
/**
* Constructor.
*/
- public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
- this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
- }
-
- /**
- * Constructor.
- */
- CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) {
+ @TestOnly
+ public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) {
this(updateLog, clockWaiter, () -> delayDurationMs);
}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 3ec2cc3c1f..6994c72226 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -1081,7 +1081,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
- CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter);
+ CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter, 0L);
manager.start();
when(updateLogMock.append(any())).thenAnswer(invocation -> {
@@ -1152,7 +1152,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest {
public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
UpdateLog updateLogMock = mock(UpdateLog.class);
- CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter);
+ CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter, 0L);
manager.start();
diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
index 8d3f230eb0..1b7c6830ad 100644
--- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
+++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
@@ -64,7 +64,7 @@ public abstract class BaseCatalogManagerTest extends BaseIgniteAbstractTest {
updateLog = spy(new UpdateLogImpl(metastore));
clockWaiter = spy(new ClockWaiter(NODE_NAME, clock));
- manager = new CatalogManagerImpl(updateLog, clockWaiter);
+ manager = new CatalogManagerImpl(updateLog, clockWaiter, 0L);
vault.start();
metastore.start();
diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index 03b15ac741..d313ca9f5d 100644
--- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -47,7 +47,7 @@ public class CatalogTestUtils {
var clockWaiter = new ClockWaiter(nodeName, clock);
- return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter) {
+ return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter, 0L) {
@Override
public void start() {
vault.start();
@@ -91,7 +91,7 @@ public class CatalogTestUtils {
public static CatalogManager createTestCatalogManager(String nodeName, HybridClock clock, MetaStorageManager metastore) {
var clockWaiter = new ClockWaiter(nodeName, clock);
- return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter) {
+ return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter, 0L) {
@Override
public void start() {
clockWaiter.start();
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
index dc4d66f463..cbab5d1930 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
@@ -37,6 +37,9 @@ public class TestIgnitionManager {
/** Default name of configuration file. */
public static final String DEFAULT_CONFIG_NAME = "ignite-config.conf";
+ private static final int DEFAULT_DELAY_DURATION_MS = 100;
+ private static final int DEFAULT_METASTORAGE_IDLE_SYNC_TIME_INTERVAL_MS = 10;
+
/**
* Starts an Ignite node with an optional bootstrap configuration from an input stream with HOCON configs.
*
@@ -100,20 +103,28 @@ public class TestIgnitionManager {
.metaStorageNodeNames(params.metaStorageNodeNames())
.cmgNodeNames(params.cmgNodeNames());
+ ConfigDocument configDocument;
+
if (params.clusterConfiguration() == null) {
- builder.clusterConfiguration("{ schemaSync.delayDuration: 0 }");
+ configDocument = ConfigDocumentFactory.parseString("{}");
} else {
- ConfigDocument configDocument = ConfigDocumentFactory.parseString(params.clusterConfiguration());
-
- String delayDurationPath = "schemaSync.delayDuration";
+ configDocument = ConfigDocumentFactory.parseString(params.clusterConfiguration());
+ }
- if (!configDocument.hasPath(delayDurationPath)) {
- ConfigDocument updatedDocument = configDocument.withValueText(delayDurationPath, "0");
+ configDocument = amendSetting(configDocument, "schemaSync.delayDuration", Integer.toString(DEFAULT_DELAY_DURATION_MS));
+ configDocument = amendSetting(configDocument, "metaStorage.idleSyncTimeInterval",
+ Integer.toString(DEFAULT_METASTORAGE_IDLE_SYNC_TIME_INTERVAL_MS));
- builder.clusterConfiguration(updatedDocument.render());
- }
- }
+ builder.clusterConfiguration(configDocument.render());
return builder.build();
}
+
+ private static ConfigDocument amendSetting(ConfigDocument document, String path, String value) {
+ if (document.hasPath(path)) {
+ return document;
+ } else {
+ return document.withValueText(path, value);
+ }
+ }
}
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index a434c0d9c3..2f7a89d844 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -136,7 +136,7 @@ public class IndexManagerTest {
clockWaiter = new ClockWaiter(nodeName, clock);
- catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageManager), clockWaiter);
+ catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageManager), clockWaiter, 0L);
indexManager = new IndexManager(
tablesConfig,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 814681226b..5e3002ec48 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
@@ -73,6 +74,7 @@ import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -148,6 +150,7 @@ import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -777,10 +780,18 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
clockWaiter = new ClockWaiter(name, hybridClock);
+ SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration(
+ SchemaSynchronizationConfiguration.KEY
+ );
+
+ LongSupplier delayDuration = () -> schemaSyncConfig.delayDuration().value();
+
catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageManager),
- clockWaiter
- );
+ clockWaiter,
+ delayDuration);
+
+ var schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), catalogManager, delayDuration);
schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager);
@@ -820,7 +831,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
vaultManager,
cmgManager,
distributionZoneManager,
- catalogManager
+ catalogManager,
+ schemaSyncService
) {
@Override
protected TxStateTableStorage createTxStateTableStorage(
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 029946c3ba..393fec38ed 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -219,7 +219,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
var clockWaiter = new ClockWaiter(name, clock);
- var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter);
+ var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter, 0L);
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 045684c66a..2371f7c57f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import com.google.common.base.Supplier;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -112,6 +115,7 @@ import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -342,7 +346,15 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
var clockWaiter = new ClockWaiter(name, clock);
- var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageMgr), clockWaiter);
+ SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration(
+ SchemaSynchronizationConfiguration.KEY
+ );
+
+ LongSupplier delayDuration = () -> schemaSyncConfig.delayDuration().value();
+
+ var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageMgr), clockWaiter, delayDuration);
+
+ var schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), catalogManager, delayDuration);
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
@@ -380,7 +392,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
vault,
null,
null,
- catalogManager
+ catalogManager,
+ schemaSyncService
);
var indexManager = new IndexManager(tablesConfig, schemaManager, tableManager, catalogManager, metaStorageMgr, registry);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fd1a505485..e4a5924a1f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -128,6 +128,7 @@ import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
@@ -504,6 +505,12 @@ public class IgniteImpl implements Ignite {
() -> schemaSyncConfig.delayDuration().value()
);
+ SchemaSyncServiceImpl schemaSyncService = new SchemaSyncServiceImpl(
+ metaStorageManager().clusterTime(),
+ catalogManager,
+ () -> schemaSyncConfig.delayDuration().value()
+ );
+
distributionZoneManager = new DistributionZoneManager(
name,
registry,
@@ -544,7 +551,8 @@ public class IgniteImpl implements Ignite {
vaultMgr,
cmgMgr,
distributionZoneManager,
- catalogManager
+ catalogManager,
+ schemaSyncService
);
indexManager = new IndexManager(tablesConfig, schemaManager, distributedTblMgr, catalogManager, metaStorageMgr, registry);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index feeb83f49d..8581f93056 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -167,6 +167,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Snaps
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
import org.apache.ignite.internal.table.event.TableEvent;
@@ -374,6 +375,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private final ConfiguredTablesCache configuredTablesCache;
+ private final SchemaSyncService schemaSyncService;
+
private final CatalogManager catalogManager;
/** Versioned value used only at manager startup to correctly fire table creation events. */
@@ -426,7 +429,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
VaultManager vaultManager,
ClusterManagementGroupManager cmgMgr,
DistributionZoneManager distributionZoneManager,
- CatalogManager catalogManager
+ CatalogManager catalogManager,
+ SchemaSyncService schemaSyncService
) {
this.tablesCfg = tablesCfg;
this.zonesConfig = zonesConfig;
@@ -450,6 +454,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
this.cmgMgr = cmgMgr;
this.distributionZoneManager = distributionZoneManager;
this.catalogManager = catalogManager;
+ this.schemaSyncService = schemaSyncService;
clusterNodeResolver = topologyService::getByConsistentId;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index afbdf4056f..d3ebddfced 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -112,6 +112,7 @@ import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
@@ -836,7 +837,8 @@ public class TableManagerTest extends IgniteAbstractTest {
vaultManager,
cmgMgr,
distributionZoneManager,
- mock(CatalogManager.class)
+ mock(CatalogManager.class),
+ mock(SchemaSyncService.class)
) {
@Override