You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/07/25 09:50:25 UTC
[ignite-3] 01/06: Mirror DistributionZone changes from Config to Catalog.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-19942
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 8392fe8631fbb5038f69ace2c8c07bd44a0e599d
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Jul 4 23:29:33 2023 +0300
Mirror DistributionZone changes from Config to Catalog.
---
.../internal/catalog/CatalogServiceImpl.java | 6 +-
.../catalog/descriptors/CatalogZoneDescriptor.java | 3 +
modules/distribution-zones/build.gradle | 2 +
.../distributionzones/DistributionZoneManager.java | 74 ++++++++++++++++++++--
.../BaseDistributionZoneManagerTest.java | 1 +
...ibutionZoneManagerConfigurationChangesTest.java | 1 +
.../DistributionZoneManagerTest.java | 1 +
.../DistributionZoneMockTest.java | 2 +
.../DistributionZonesTestUtil.java | 34 ++++++++++
.../storage/ItRebalanceDistributedTest.java | 1 +
...niteDistributionZoneManagerNodeRestartTest.java | 2 +
.../runner/app/ItIgniteNodeRestartTest.java | 15 +++--
.../org/apache/ignite/internal/app/IgniteImpl.java | 21 +++---
.../engine/exec/ddl/DdlCommandHandlerWrapper.java | 34 ----------
.../DdlCommandHandlerExceptionHandlingTest.java | 28 ++++----
15 files changed, 151 insertions(+), 74 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index a1981a8924..cc6f54b1a0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -149,18 +149,18 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
@Override
public void start() {
- int objectIdGen = 0;
+ int objectIdGen = 1;
// TODO: IGNITE-19082 Move default schema objects initialization to cluster init procedure.
CatalogSchemaDescriptor schemaPublic = new CatalogSchemaDescriptor(
- objectIdGen++,
+ 0,
DEFAULT_SCHEMA_NAME,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0]
);
CatalogZoneDescriptor defaultZone = new CatalogZoneDescriptor(
- objectIdGen++,
+ 0,
DEFAULT_ZONE_NAME,
25,
1,
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java
index fd636e2b68..01fd779cb0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java
@@ -76,6 +76,9 @@ public class CatalogZoneDescriptor extends CatalogObjectDescriptor {
this.dataNodesAutoAdjustScaleUp = dataNodesAutoAdjustScaleUp;
this.dataNodesAutoAdjustScaleDown = dataNodesAutoAdjustScaleDown;
this.filter = filter;
+
+ // TODO: IGNITE-19719 Fix it
+ this.dataStorage = new CatalogDataStorageDescriptor("aipersist", "default");
}
/**
diff --git a/modules/distribution-zones/build.gradle b/modules/distribution-zones/build.gradle
index c0d38eff4f..ab2b03d87d 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -26,6 +26,7 @@ dependencies {
annotationProcessor project(":ignite-configuration-annotation-processor")
annotationProcessor libs.auto.service
+ implementation project(':ignite-catalog')
implementation project(':ignite-core')
implementation project(':ignite-configuration-api')
implementation project(':ignite-api')
@@ -65,6 +66,7 @@ dependencies {
testFixturesImplementation libs.mockito.core
testFixturesImplementation libs.mockito.junit
testFixturesImplementation libs.hamcrest.core
+ testFixturesImplementation project(':ignite-catalog')
testFixturesImplementation project(':ignite-raft-api')
testFixturesImplementation project(':ignite-metastorage')
testFixturesImplementation project(':ignite-schema')
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index d61f044560..97863daf36 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.distributionzones;
import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
@@ -92,6 +93,12 @@ import org.apache.ignite.configuration.notifications.ConfigurationListener;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.AlterZoneParams;
+import org.apache.ignite.internal.catalog.commands.CreateZoneParams;
+import org.apache.ignite.internal.catalog.commands.DropZoneParams;
+import org.apache.ignite.internal.catalog.commands.RenameZoneParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -185,6 +192,9 @@ public class DistributionZoneManager implements IgniteComponent {
/** Tables configuration. */
private final TablesConfiguration tablesConfiguration;
+ /** Catalog service. */
+ private final CatalogManager catalogManager;
+
/** Meta Storage manager. */
private final MetaStorageManager metaStorageManager;
@@ -256,6 +266,7 @@ public class DistributionZoneManager implements IgniteComponent {
*
* @param zonesConfiguration Distribution zones configuration.
* @param tablesConfiguration Tables configuration.
+ * @param catalogManager Catalog manager.
* @param metaStorageManager Meta Storage manager.
* @param logicalTopologyService Logical topology service.
* @param vaultMgr Vault manager.
@@ -264,6 +275,7 @@ public class DistributionZoneManager implements IgniteComponent {
public DistributionZoneManager(
DistributionZonesConfiguration zonesConfiguration,
TablesConfiguration tablesConfiguration,
+ CatalogManager catalogManager,
MetaStorageManager metaStorageManager,
LogicalTopologyService logicalTopologyService,
VaultManager vaultMgr,
@@ -274,6 +286,7 @@ public class DistributionZoneManager implements IgniteComponent {
this.metaStorageManager = metaStorageManager;
this.logicalTopologyService = logicalTopologyService;
this.vaultMgr = vaultMgr;
+ this.catalogManager = catalogManager;
this.topologyWatchListener = createMetastorageTopologyListener();
@@ -386,9 +399,37 @@ public class DistributionZoneManager implements IgniteComponent {
}
try {
- CompletableFuture<Integer> fut = new CompletableFuture<>();
+ return catalogManager.createDistributionZone(CreateZoneParams.builder()
+ .zoneName(distributionZoneCfg.name())
+ .partitions(distributionZoneCfg.partitions())
+ .filter(distributionZoneCfg.filter())
+ .replicas(distributionZoneCfg.replicas())
+ .dataNodesAutoAdjust(distributionZoneCfg.dataNodesAutoAdjust())
+ .dataNodesAutoAdjustScaleUp(distributionZoneCfg.dataNodesAutoAdjustScaleUp())
+ .dataNodesAutoAdjustScaleDown(distributionZoneCfg.dataNodesAutoAdjustScaleDown())
+ .build())
+ .thenApply(ignore -> catalogManager.zone(distributionZoneCfg.name(), Long.MAX_VALUE))
+ .thenCompose(zoneDescriptor -> createZone(zoneDescriptor.id(), distributionZoneCfg))
+ .whenComplete((id, ex) -> {
+ if (ex != null) {
+ LOG.warn("Failed to create zone.", ex);
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
- int[] zoneIdContainer = new int[1];
+ private CompletableFuture<Integer> createZone(
+ int intZoneId,
+ DistributionZoneConfigurationParameters distributionZoneCfg
+ ) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ CompletableFuture<Integer> fut = new CompletableFuture<>();
zonesConfiguration.change(zonesChange -> zonesChange.changeDistributionZones(zonesListChange -> {
try {
@@ -427,11 +468,9 @@ public class DistributionZoneManager implements IgniteComponent {
zoneChange.changeDataNodesAutoAdjustScaleDown(distributionZoneCfg.dataNodesAutoAdjustScaleDown());
}
- int intZoneId = zonesChange.globalIdCounter() + 1;
zonesChange.changeGlobalIdCounter(intZoneId);
zoneChange.changeZoneId(intZoneId);
- zoneIdContainer[0] = intZoneId;
});
} catch (ConfigurationNodeAlreadyExistException e) {
throw new DistributionZoneAlreadyExistsException(distributionZoneCfg.name(), e);
@@ -445,7 +484,7 @@ public class DistributionZoneManager implements IgniteComponent {
ConfigurationValidationException.class)
);
} else {
- fut.complete(zoneIdContainer[0]);
+ fut.complete(intZoneId);
}
});
@@ -496,6 +535,23 @@ public class DistributionZoneManager implements IgniteComponent {
try {
CompletableFuture<Void> fut = new CompletableFuture<>();
+ CompletableFuture<Void> catalogFut = catalogManager.alterDistributionZone(AlterZoneParams.builder()
+ .zoneName(name)
+ .partitions(distributionZoneCfg.partitions())
+ .replicas(distributionZoneCfg.replicas())
+ .filter(distributionZoneCfg.filter())
+ .dataNodesAutoAdjust(distributionZoneCfg.dataNodesAutoAdjust())
+ .dataNodesAutoAdjustScaleUp(distributionZoneCfg.dataNodesAutoAdjustScaleUp())
+ .dataNodesAutoAdjustScaleDown(distributionZoneCfg.dataNodesAutoAdjustScaleDown())
+ .build());
+
+ if (!name.equals(distributionZoneCfg.name())) {
+ catalogFut = catalogFut.thenCompose(ignore -> catalogManager.renameDistributionZone(RenameZoneParams.builder()
+ .zoneName(name)
+ .newZoneName(distributionZoneCfg.name())
+ .build()));
+ }
+
CompletableFuture<Void> change;
if (DEFAULT_ZONE_NAME.equals(name)) {
@@ -524,7 +580,7 @@ public class DistributionZoneManager implements IgniteComponent {
}));
}
- change.whenComplete((res, e) -> {
+ allOf(catalogFut, change).whenComplete((res, e) -> {
if (e != null) {
fut.completeExceptionally(
unwrapDistributionZoneException(
@@ -603,7 +659,11 @@ public class DistributionZoneManager implements IgniteComponent {
}
});
- return fut;
+ CompletableFuture<Void> dropZoneFut = catalogManager.dropDistributionZone(DropZoneParams.builder()
+ .zoneName(name)
+ .build());
+
+ return allOf(dropZoneFut, fut);
} finally {
busyLock.leaveBusy();
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index 9c69f09930..a9177185e0 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -126,6 +126,7 @@ public class BaseDistributionZoneManagerTest extends BaseIgniteAbstractTest {
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
tablesConfiguration,
+ DistributionZonesTestUtil.mockCatalog(),
metaStorageManager,
new LogicalTopologyServiceImpl(topology, cmgManager),
vaultMgr,
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 713ee78a4e..8f58ea7b78 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -146,6 +146,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
tablesConfiguration,
+ DistributionZonesTestUtil.mockCatalog(),
metaStorageManager,
logicalTopologyService,
vaultMgr,
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
index 5bd1324a97..8c1bb0fc2a 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
@@ -99,6 +99,7 @@ class DistributionZoneManagerTest extends IgniteAbstractTest {
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
tablesConfiguration,
+ DistributionZonesTestUtil.mockCatalog(),
null,
null,
null,
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java
index 75db4d543f..17ede4c011 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -44,6 +45,7 @@ public class DistributionZoneMockTest {
DistributionZoneManager zoneMgr = new DistributionZoneManager(
zonesConfiguration,
mock(TablesConfiguration.class),
+ mock(CatalogManager.class),
mock(MetaStorageManager.class),
mock(LogicalTopologyService.class),
mock(VaultManager.class),
diff --git a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 1cdabe81d1..729f15d143 100644
--- a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++ b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.distributionzones;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_FILTER;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
@@ -41,6 +42,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.Map;
import java.util.Objects;
@@ -48,9 +54,12 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -486,4 +495,29 @@ public class DistributionZonesTestUtil {
assertThat(dataNodes, is(expectedValueNames));
}
}
+
+ /**
+ * Creates CatalogService mock.
+ */
+ public static CatalogManager mockCatalog() {
+ AtomicInteger idGen = new AtomicInteger(0);
+
+ CatalogManager catalogManager = mock(CatalogManager.class);
+ when(catalogManager.createDistributionZone(any())).thenReturn(completedFuture(null));
+ when(catalogManager.alterDistributionZone(any())).thenReturn(completedFuture(null));
+ when(catalogManager.renameDistributionZone(any())).thenReturn(completedFuture(null));
+ when(catalogManager.dropDistributionZone(any())).thenReturn(completedFuture(null));
+ when(catalogManager.zone(anyString(), anyLong())).thenAnswer(invocation -> zoneDescriptorMock(idGen));
+
+ return catalogManager;
+ }
+
+ private static CatalogZoneDescriptor zoneDescriptorMock(AtomicInteger idGen) {
+ int zoneId = idGen.incrementAndGet();
+
+ CatalogZoneDescriptor descriptor = mock(CatalogZoneDescriptor.class);
+ when(descriptor.id()).thenReturn(zoneId);
+
+ return descriptor;
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 77b022c0d4..74573201f7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -795,6 +795,7 @@ public class ItRebalanceDistributedTest {
distributionZoneManager = new DistributionZoneManager(
zonesCfg,
tablesCfg,
+ catalogManager,
metaStorageManager,
logicalTopologyService,
vaultManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index daff409fc2..3b2174a6e0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.configuration.validation.ConfigurationValidato
import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -204,6 +205,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
tablesConfiguration,
+ DistributionZonesTestUtil.mockCatalog(),
metaStorageMgr,
logicalTopologyService,
vault,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7a9b323950..2d3c724058 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -338,24 +338,25 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
+ var clockWaiter = new ClockWaiter("test", hybridClock);
+
+ var catalogManager = new CatalogServiceImpl(
+ new UpdateLogImpl(metaStorageMgr),
+ clockWaiter
+ );
+
SchemaManager schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr);
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
zonesConfig,
tablesConfig,
+ catalogManager,
metaStorageMgr,
logicalTopologyService,
vault,
name
);
- var clockWaiter = new ClockWaiter("test", hybridClock);
-
- var catalogManager = new CatalogServiceImpl(
- new UpdateLogImpl(metaStorageMgr),
- clockWaiter
- );
-
TableManager tableManager = new TableManager(
name,
registry,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d89c111b1a..c4eda773d6 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -497,9 +497,20 @@ public class IgniteImpl implements Ignite {
schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr);
+ SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration(
+ SchemaSynchronizationConfiguration.KEY
+ );
+
+ catalogManager = new CatalogServiceImpl(
+ new UpdateLogImpl(metaStorageMgr),
+ clockWaiter,
+ () -> schemaSyncConfig.delayDuration().value()
+ );
+
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
tablesConfig,
+ catalogManager,
metaStorageMgr,
logicalTopologyService,
vaultMgr,
@@ -510,16 +521,6 @@ public class IgniteImpl implements Ignite {
outgoingSnapshotsManager = new OutgoingSnapshotsManager(clusterSvc.messagingService());
- SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration(
- SchemaSynchronizationConfiguration.KEY
- );
-
- catalogManager = new CatalogServiceImpl(
- new UpdateLogImpl(metaStorageMgr),
- clockWaiter,
- () -> schemaSyncConfig.delayDuration().value()
- );
-
distributedTblMgr = new TableManager(
name,
registry,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
index 912c66fe3b..a7a9b7f78e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
@@ -28,19 +28,13 @@ import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterColumnCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableAddCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableDropCommand;
-import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterZoneRenameCommand;
-import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterZoneSetCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateIndexCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
-import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateZoneCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DropIndexCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DropTableCommand;
-import org.apache.ignite.internal.sql.engine.prepare.ddl.DropZoneCommand;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.lang.DistributionZoneAlreadyExistsException;
-import org.apache.ignite.lang.DistributionZoneNotFoundException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.IndexNotFoundException;
import org.apache.ignite.lang.TableAlreadyExistsException;
@@ -120,34 +114,6 @@ public class DdlCommandHandlerWrapper extends DdlCommandHandler {
.thenCompose(res -> catalogManager.dropIndex(DdlToCatalogCommandConverter.convert((DropIndexCommand) cmd))
.handle(handleModificationResult(((DropIndexCommand) cmd).ifNotExists(), IndexNotFoundException.class))
);
- } else if (cmd instanceof CreateZoneCommand) {
- CreateZoneCommand zoneCommand = (CreateZoneCommand) cmd;
-
- return ddlCommandFuture
- .thenCompose(res -> catalogManager.createDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand))
- .handle(handleModificationResult(zoneCommand.ifNotExists(), DistributionZoneAlreadyExistsException.class))
- );
- } else if (cmd instanceof DropZoneCommand) {
- DropZoneCommand zoneCommand = (DropZoneCommand) cmd;
-
- return ddlCommandFuture
- .thenCompose(res -> catalogManager.dropDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand))
- .handle(handleModificationResult(zoneCommand.ifExists(), DistributionZoneNotFoundException.class))
- );
- } else if (cmd instanceof AlterZoneRenameCommand) {
- AlterZoneRenameCommand zoneCommand = (AlterZoneRenameCommand) cmd;
-
- return ddlCommandFuture
- .thenCompose(res -> catalogManager.renameDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand))
- .handle(handleModificationResult(zoneCommand.ifExists(), DistributionZoneNotFoundException.class))
- );
- } else if (cmd instanceof AlterZoneSetCommand) {
- AlterZoneSetCommand zoneCommand = (AlterZoneSetCommand) cmd;
-
- return ddlCommandFuture
- .thenCompose(res -> catalogManager.alterDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand))
- .handle(handleModificationResult(zoneCommand.ifExists(), DistributionZoneNotFoundException.class))
- );
}
return ddlCommandFuture;
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
index 0c42f9fe49..b9e810ecd0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
@@ -17,23 +17,26 @@
package org.apache.ignite.internal.sql.engine.exec.ddl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.ignite.configuration.NamedConfigurationTree;
-import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
@@ -42,9 +45,6 @@ import org.apache.ignite.internal.distributionzones.DistributionZoneConfiguratio
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.index.IndexManager;
-import org.apache.ignite.internal.schema.configuration.TableChange;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateZoneCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DropZoneCommand;
import org.apache.ignite.internal.storage.DataStorageManager;
@@ -106,17 +106,19 @@ public class DdlCommandHandlerExceptionHandlingTest extends IgniteAbstractTest {
DistributionZonesConfiguration zonesConfiguration = registry.getConfiguration(DistributionZonesConfiguration.KEY);
- NamedConfigurationTree<TableConfiguration, TableView, TableChange> tables = mock(NamedConfigurationTree.class);
-
- NamedListView<TableView> value = mock(NamedListView.class);
-
- when(tables.value()).thenReturn(value);
-
- when(value.namedListKeys()).thenReturn(new ArrayList<>());
+ CatalogManager catalogManager = mock(CatalogManager.class);
+ when(catalogManager.createDistributionZone(any())).thenReturn(completedFuture(null));
+ when(catalogManager.alterDistributionZone(any())).thenReturn(completedFuture(null));
+ when(catalogManager.renameDistributionZone(any())).thenReturn(completedFuture(null));
+ when(catalogManager.dropDistributionZone(any())).thenReturn(completedFuture(null));
+ CatalogZoneDescriptor desc = mock(CatalogZoneDescriptor.class);
+ when(catalogManager.zone(anyString(), anyLong())).thenReturn(desc);
+ when(desc.id()).thenReturn(42);
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
null,
+ catalogManager,
null,
null,
null,