You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/03/16 22:56:13 UTC

[ignite-3] 01/01: WIP. Split into CatalogService and CatalogManager. Add DdlHandler wrapper. Add CreateTable command.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-18535
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 1d827d291c87bd9f826e4a908db9d830e3a23872
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Mar 16 18:35:48 2023 +0300

    WIP. Split into CatalogService and CatalogManager. Add DdlHandler wrapper. Add CreateTable command.
---
 modules/catalog/build.gradle                       |   4 +-
 .../ignite/internal/catalog/CatalogManager.java}   |  27 ++-
 .../internal/catalog/CatalogManagerImpl.java       |  57 +++++++
 .../ignite/internal/catalog/CatalogService.java    |  14 +-
 .../internal/catalog/CatalogServiceImpl.java       |  53 ++++--
 .../ColumnParams.java}                             |  71 ++++----
 .../catalog/commands/CreateTableParams.java        | 184 +++++++++++++++++++++
 .../catalog/commands/DefaultValueParams.java       |  97 +++++++++++
 .../catalog/descriptors/CatalogDescriptor.java     |  18 +-
 .../catalog/descriptors/ObjectDescriptor.java      |   5 +
 .../catalog/descriptors/SchemaDescriptor.java      |   9 +
 .../catalog/descriptors/TableColumnDescriptor.java |  11 ++
 .../catalog/descriptors/TableDescriptor.java       |   9 +
 modules/runner/build.gradle                        |   1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   8 +-
 modules/sql-engine/build.gradle                    |   1 +
 .../internal/sql/engine/SqlQueryProcessor.java     |  13 +-
 .../engine/exec/ddl/DdlCommandHandlerWrapper.java  |  61 +++++++
 .../exec/ddl/DdlToCatalogCommandConverter.java     |  67 ++++++++
 .../internal/sql/engine/StopCalciteModuleTest.java |   7 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   7 +-
 21 files changed, 647 insertions(+), 77 deletions(-)

diff --git a/modules/catalog/build.gradle b/modules/catalog/build.gradle
index a126476f48..6745603773 100644
--- a/modules/catalog/build.gradle
+++ b/modules/catalog/build.gradle
@@ -27,10 +27,8 @@ dependencies {
     implementation project(':ignite-api')
     implementation project(':ignite-core')
     implementation project(':ignite-configuration')
+    implementation project(':ignite-metastorage-api')
     implementation libs.jetbrains.annotations
-
-    testImplementation(testFixtures(project(':ignite-configuration')))
-    testImplementation(testFixtures(project(':ignite-core')))
 }
 
 description = "ignite-catalog"
diff --git a/modules/catalog/build.gradle b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
similarity index 53%
copy from modules/catalog/build.gradle
copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index a126476f48..8feea3d3e5 100644
--- a/modules/catalog/build.gradle
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -15,22 +15,19 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.catalog;
 
-dependencies {
-    annotationProcessor project(':ignite-configuration-annotation-processor')
-    annotationProcessor libs.auto.service
-    implementation libs.jetbrains.annotations
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.CreateTableParams;
 
-    implementation project(':ignite-api')
-    implementation project(':ignite-core')
-    implementation project(':ignite-configuration')
-    implementation libs.jetbrains.annotations
+/**
+ * Catalog manager provides schema manipulation methods is responsible for managing distributed operations.
+ */
+public interface CatalogManager {
+    //TODO: IGNITE-18535 Remove when all versioned schema stuff will be moved to Catalog.
+    @Deprecated(forRemoval = true)
+    boolean USE_CATALOG = Boolean.getBoolean("IGNITE_USE_CATALOG");
 
-    testImplementation(testFixtures(project(':ignite-configuration')))
-    testImplementation(testFixtures(project(':ignite-core')))
+    //TODO: IGNITE-18535 enrich with schema manipulation methods.
+    CompletableFuture<?> createTable(CreateTableParams command);
 }
-
-description = "ignite-catalog"
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
new file mode 100644
index 0000000000..771a3a6da9
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.CreateTableParams;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+
+/**
+ * Catalog manager implementation.
+ */
+public class CatalogManagerImpl implements CatalogManager {
+    //    private final CatalogService catalogService;
+    //    private final MetaStorageManager metaStorage;
+
+    /**
+     * Constructor.
+     */
+    public CatalogManagerImpl(CatalogService catalogService, MetaStorageManager metaStorage) {
+        //        this.catalogService = catalogService;
+        //
+        //        this.metaStorage = metaStorage;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<?> createTable(CreateTableParams params) {
+        // Create operation future, which will returned, and saved to a map.
+        CompletableFuture<Object> opFuture = CompletableFuture.completedFuture(null);
+
+        // Creates TableDescriptor and saves it to MetaStorage.
+        // Atomically:
+        //        int tableId = metaStorage.get("lastTableId");
+        //        metaStorage.put("table-"+tableId, new TableDescriptor(tableId, params));
+        //        metaStorage.put("lastTableId", tableId+1);
+
+        // Subscribes operation future to the MetaStorage future for failure handling
+        // Operation future must be completed when got event from catalog service for expected table.
+
+        return opFuture;
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 0767cbde36..26885f90a3 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -18,21 +18,20 @@
 package org.apache.ignite.internal.catalog;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
-import org.apache.ignite.internal.configuration.DynamicConfigurationChanger;
 
 /**
  * Catalog service provides methods to access schema object's descriptors of exact version and/or last actual version at given timestamp,
  * which is logical point-in-time.
  *
- * <p>Catalog service is responsible for proper configuration updates and storing/restoring schema evolution history (schema versions)
- * for time-travelled queries purposes and lazy data evolution purposes.
+ * <p>Catalog service listens distributed schema update event, stores/restores schema evolution history (schema versions) for time-travelled
+ * queries purposes and for lazy data evolution purposes.
  *
- * <p>TBD: schema manipulation methods.
- * TBD: events
+ * <p>TBD: events
+ *
+ * @see org.apache.ignite.internal.catalog.events.CatalogEvent
  */
 public interface CatalogService {
     TableDescriptor table(String tableName, long timestamp);
@@ -46,7 +45,4 @@ public interface CatalogService {
     SchemaDescriptor schema(int version);
 
     SchemaDescriptor activeSchema(long timestamp);
-
-    //TODO: IGNITE-18535 enrich with schema manipulation methods.
-    CompletableFuture<SchemaDescriptor> updateSchema(DynamicConfigurationChanger changer);
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index e46d1bc788..3ea85ccfb0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.catalog;
 
 import java.util.Collection;
 import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -30,20 +29,44 @@ import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import org.apache.ignite.internal.configuration.DynamicConfigurationChanger;
 import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.lang.ByteArray;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * TODO: IGNITE-18535 Fix javadoc
+ * Catalog service implementation.
  */
 public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParameters> implements CatalogService {
-
     /** Versioned catalog descriptors. */
-    private final ConcurrentMap<Integer, CatalogDescriptor> catalogByVer = new ConcurrentHashMap<>(); //TODO: IGNITE-18535 Use IntMap instead.
+    //TODO: IGNITE-18535 Use copy-on-write approach with IntMap instead??
+    private final ConcurrentMap<Integer, CatalogDescriptor> catalogByVer = new ConcurrentHashMap<>();
 
     /** Versioned catalog descriptors sorted in chronological order. */
-    private final ConcurrentNavigableMap<Long, CatalogDescriptor> catalogByTs = new ConcurrentSkipListMap<>(); //TODO: IGNITE-18535 Use LongMap instead.
+    //TODO: IGNITE-18535 Use copy-on-write approach with Map instead??
+    private final ConcurrentNavigableMap<Long, CatalogDescriptor> catalogByTs = new ConcurrentSkipListMap<>();
+
+    private final MetaStorageManager metaStorageMgr;
+
+    private final WatchListener catalogVersionsListener;
+
+    /**
+     * Constructor.
+     */
+    public CatalogServiceImpl(MetaStorageManager metaStorageMgr) {
+        this.metaStorageMgr = metaStorageMgr;
+        catalogVersionsListener = new CatalogEventListener();
+    }
+
+    public void start() {
+        metaStorageMgr.registerPrefixWatch(ByteArray.fromString("catalog-"), catalogVersionsListener);
+    }
+
+    public void stop() {
+        metaStorageMgr.unregisterWatch(catalogVersionsListener);
+    }
 
     /** {@inheritDoc} */
     @Override
@@ -81,12 +104,6 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
         return catalogAt(timestamp).schema(CatalogDescriptor.DEFAULT_SCHEMA_NAME);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<SchemaDescriptor> updateSchema(DynamicConfigurationChanger changer) {
-        return null;
-    }
-
     private CatalogDescriptor catalog(int version) {
         return catalogByVer.get(version);
     }
@@ -100,4 +117,16 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
         return entry.getValue();
     }
+
+    private static class CatalogEventListener implements WatchListener {
+        @Override
+        public void onUpdate(WatchEvent event) {
+
+        }
+
+        @Override
+        public void onError(Throwable e) {
+
+        }
+    }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/ColumnParams.java
similarity index 51%
copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/ColumnParams.java
index 4a7d9aeb60..7c7dfbb18c 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/ColumnParams.java
@@ -15,61 +15,72 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.catalog.descriptors;
+package org.apache.ignite.internal.catalog.commands;
 
-import java.io.Serializable;
 import java.util.Objects;
-import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.sql.ColumnType;
 
-/**
- * Table column descriptor.
- */
-public class TableColumnDescriptor implements Serializable {
-    private static final long serialVersionUID = 7684890562398520509L;
-
+/** Defines a particular column within table. */
+public class ColumnParams {
     private final String name;
+
     private final ColumnType type;
+
     private final boolean nullable;
-    /** Max length constraint. */
-    private int length;
-    private int precision;
-    private int scale;
-    private String defaultValueExpression;
 
-    public TableColumnDescriptor(String name, ColumnType type, boolean nullable) {
+    private final DefaultValueParams defaultValueDefinition;
+
+    /** Creates a column definition. */
+    public ColumnParams(String name, ColumnType type, DefaultValueParams defaultValueDefinition, boolean nullable) {
         this.name = Objects.requireNonNull(name, "name");
-        this.type = Objects.requireNonNull(type);
+        this.type = Objects.requireNonNull(type, "type");
+        this.defaultValueDefinition = Objects.requireNonNull(defaultValueDefinition, "defaultValueDefinition");
         this.nullable = nullable;
     }
 
+    /**
+     * Get column's name.
+     */
     public String name() {
         return name;
     }
 
-    public boolean nullable() {
-        return nullable;
-    }
-
+    /**
+     * Get column's type.
+     */
     public ColumnType type() {
         return type;
     }
 
-    public int precision() {
-        return precision;
+    /**
+     * Returns default value definition.
+     *
+     * @param <T> Desired subtype of the definition.
+     * @return Default value definition.
+     */
+    @SuppressWarnings("unchecked")
+    public <T extends DefaultValueParams> T defaultValueDefinition() {
+        return (T) defaultValueDefinition;
     }
 
-    public int scale() {
-        return scale;
+    /**
+     * Get nullable flag: {@code true} if this column accepts nulls.
+     */
+    public boolean nullable() {
+        return nullable;
     }
 
-    public int length() {
-        return length;
+    /**
+     * Get column's precision.
+     */
+    public Integer precision() {
+        return null;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public String toString() {
-        return S.toString(this);
+    /**
+     * Get column's scale.
+     */
+    public Integer scale() {
+        return null;
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
new file mode 100644
index 0000000000..d9fc08e73b
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * CREATE TABLE statement.
+ */
+public class CreateTableParams {
+    /** Replicas number. */
+    private Integer replicas;
+
+    /** Number of partitions for the new table. */
+    private Integer partitions;
+
+    /** Primary key columns. */
+    private List<String> pkCols;
+
+    /** Colocation columns. */
+    private List<String> colocationCols;
+
+    /** Columns. */
+    private List<ColumnParams> cols;
+
+    private String dataStorage;
+
+    @Nullable
+    private Map<String, Object> dataStorageOptions;
+
+    private String zone;
+
+    /**
+     * Get primary key columns.
+     */
+    public List<String> primaryKeyColumns() {
+        return pkCols;
+    }
+
+    /**
+     * Set primary key columns.
+     */
+    public void primaryKeyColumns(List<String> pkCols) {
+        this.pkCols = pkCols;
+    }
+
+    /**
+     * Get replicas count.
+     */
+    @Nullable
+    public Integer replicas() {
+        return replicas;
+    }
+
+    /**
+     * Set replicas count.
+     */
+    @Nullable
+    public void replicas(int repl) {
+        replicas = repl;
+    }
+
+    /**
+     * Get partitions count.
+     */
+    @Nullable
+    public Integer partitions() {
+        return partitions;
+    }
+
+    /**
+     * Set partitions count.
+     */
+    public void partitions(Integer parts) {
+        partitions = parts;
+    }
+
+    /**
+     * Get table columns.
+     *
+     * @return Columns.
+     */
+    public List<ColumnParams> columns() {
+        return cols;
+    }
+
+    /**
+     * Set table columns.
+     *
+     * @param cols Columns.
+     */
+    public void columns(List<ColumnParams> cols) {
+        this.cols = cols;
+    }
+
+    /**
+     * Set colocation column names.
+     *
+     * @return Collocation column names.
+     */
+    @Nullable
+    public List<String> colocationColumns() {
+        return colocationCols;
+    }
+
+    /**
+     * Get colocation column names.
+     *
+     * @param colocationCols Colocation column names.
+     */
+    public void colocationColumns(List<String> colocationCols) {
+        this.colocationCols = colocationCols;
+    }
+
+    /**
+     * Returns data storage.
+     */
+    public String dataStorage() {
+        return dataStorage;
+    }
+
+    /**
+     * Sets data storage.
+     *
+     * @param dataStorage Data storage.
+     */
+    public void dataStorage(String dataStorage) {
+        this.dataStorage = dataStorage;
+    }
+
+    /**
+     * Returns data storage options.
+     */
+    public Map<String, Object> dataStorageOptions() {
+        return dataStorageOptions == null ? Map.of() : dataStorageOptions;
+    }
+
+    /**
+     * Adds data storage option.
+     *
+     * @param name Option name.
+     * @param value Option value.
+     */
+    public void addDataStorageOption(String name, Object value) {
+        if (dataStorageOptions == null) {
+            dataStorageOptions = new HashMap<>();
+        }
+
+        dataStorageOptions.put(name, value);
+    }
+
+    /**
+     * Get zone name.
+     */
+    @Nullable
+    public String zone() {
+        return zone;
+    }
+
+    /**
+     * Set zone name.
+     */
+    public void zone(String zoneName) {
+        this.zone = zoneName;
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DefaultValueParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DefaultValueParams.java
new file mode 100644
index 0000000000..1e8d9227ab
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DefaultValueParams.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import java.util.Objects;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Definition of value provider to use as default.
+ */
+@SuppressWarnings("PublicInnerClass")
+public class DefaultValueParams {
+    /**
+     * Defines value provider as functional provider.
+     *
+     * @param name Name of the function to invoke to generate the value
+     * @return Default value definition.
+     */
+    public static DefaultValueParams functionCall(String name) {
+        return new FunctionCall(Objects.requireNonNull(name, "name"));
+    }
+
+    /**
+     * Defines value provider as a constant value provider.
+     *
+     * @param value A value to use as default.
+     * @return Default value definition.
+     */
+    public static DefaultValueParams constant(@Nullable Object value) {
+        return new ConstantValue(value);
+    }
+
+    /** Types of the defaults. */
+    public enum Type {
+        /** Default is specified as a constant. */
+        CONSTANT,
+
+        /** Default is specified as a call to a function. */
+        FUNCTION_CALL
+    }
+
+    protected final Type type;
+
+    private DefaultValueParams(Type type) {
+        this.type = type;
+    }
+
+    /** Returns type of the default value. */
+    public Type type() {
+        return type;
+    }
+
+    /** Defines default value provider as a function call. */
+    public static class FunctionCall extends DefaultValueParams {
+        private final String functionName;
+
+        private FunctionCall(String functionName) {
+            super(Type.FUNCTION_CALL);
+            this.functionName = functionName;
+        }
+
+        /** Returns name of the function to use as value generator. */
+        public String functionName() {
+            return functionName;
+        }
+    }
+
+    /** Defines default value provider as a constant. */
+    public static class ConstantValue extends DefaultValueParams {
+        private final Object value;
+
+        private ConstantValue(Object value) {
+            super(Type.CONSTANT);
+            this.value = value;
+        }
+
+        /** Returns value to use as default. */
+        public Object value() {
+            return value;
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java
index 9f5ba39ae3..d31a838e56 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java
@@ -34,7 +34,9 @@ import org.apache.ignite.internal.tostring.S;
  */
 public class CatalogDescriptor implements Serializable {
     public static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
+
     private static final long serialVersionUID = -2713639412596667759L;
+
     private final int version;
     private final long activationTimestamp;
     private final Map<String, SchemaDescriptor> schemas;
@@ -44,7 +46,13 @@ public class CatalogDescriptor implements Serializable {
     @IgniteToStringExclude
     private transient Map<Integer, IndexDescriptor> indexesMap;
 
-
+    /**
+     * Constructor.
+     *
+     * @param version Catalog version.
+     * @param activationTimestamp Catalog activation timestamp.
+     * @param descriptors Schema descriptors.
+     */
     public CatalogDescriptor(int version, long activationTimestamp, SchemaDescriptor[] descriptors) {
         this.version = version;
         this.activationTimestamp = activationTimestamp;
@@ -59,6 +67,10 @@ public class CatalogDescriptor implements Serializable {
         rebuildMaps();
     }
 
+    public int version() {
+        return version;
+    }
+
     public long time() {
         return activationTimestamp;
     }
@@ -82,8 +94,8 @@ public class CatalogDescriptor implements Serializable {
     private void rebuildMaps() {
         tablesMap = schemas.values().stream().flatMap(s -> Arrays.stream(s.tables()))
                 .collect(Collectors.toUnmodifiableMap(ObjectDescriptor::id, Function.identity()));
-        indexesMap = schemas.values().stream().flatMap(s -> Arrays.stream(s.indexes())).
-                collect(Collectors.toUnmodifiableMap(ObjectDescriptor::id, Function.identity()));
+        indexesMap = schemas.values().stream().flatMap(s -> Arrays.stream(s.indexes()))
+                .collect(Collectors.toUnmodifiableMap(ObjectDescriptor::id, Function.identity()));
     }
 
     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java
index eabd9e5dff..5d019b3de3 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java
@@ -47,6 +47,11 @@ public abstract class ObjectDescriptor implements Serializable {
         return name;
     }
 
+    /** Return schema-object type. */
+    public Type type() {
+        return type;
+    }
+
     /** {@inheritDoc} */
     @Override
     public String toString() {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java
index 4a4c53736f..0c82cf68a4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java
@@ -42,6 +42,15 @@ public class SchemaDescriptor extends ObjectDescriptor {
     @IgniteToStringExclude
     private transient Map<String, IndexDescriptor> indexesMap; //TODO: IGNITE-18535 Drop if not used.
 
+    /**
+     * Constructor.
+     *
+     * @param id Schema id.
+     * @param name Schema name.
+     * @param version Catalog version.
+     * @param tables Tables description.
+     * @param indexes Indexes description.
+     */
     public SchemaDescriptor(int id, String name, int version, TableDescriptor[] tables, IndexDescriptor[] indexes) {
         super(id, Type.SCHEMA, name);
         this.version = version;
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
index 4a7d9aeb60..4feafff9fa 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
@@ -37,6 +37,13 @@ public class TableColumnDescriptor implements Serializable {
     private int scale;
     private String defaultValueExpression;
 
+    /**
+     * Constructor.
+     *
+     * @param name Column name.
+     * @param type Column type.
+     * @param nullable Nullability flag.
+     */
     public TableColumnDescriptor(String name, ColumnType type, boolean nullable) {
         this.name = Objects.requireNonNull(name, "name");
         this.type = Objects.requireNonNull(type);
@@ -67,6 +74,10 @@ public class TableColumnDescriptor implements Serializable {
         return length;
     }
 
+    public String defaultValueExpression() {
+        return defaultValueExpression;
+    }
+
     /** {@inheritDoc} */
     @Override
     public String toString() {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
index 6babceaf33..30569843b0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
@@ -46,6 +46,15 @@ public class TableDescriptor extends ObjectDescriptor {
     @IgniteToStringExclude
     private transient Map<String, TableColumnDescriptor> columnsMap;
 
+    /**
+     * Constructor.
+     *
+     * @param id Table id.
+     * @param name Table name.
+     * @param columns Table column descriptors.
+     * @param pkCols Primary key column names.
+     * @param colocationCols Colocation column names.
+     */
     public TableDescriptor(int id, String name, TableColumnDescriptor[] columns, String[] pkCols, @Nullable String[] colocationCols) {
         super(id, Type.TABLE, name);
 
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 5a23568f95..4f79e0bd90 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -57,6 +57,7 @@ dependencies {
     implementation project(':ignite-placement-driver')
     implementation project(':ignite-code-deployment')
     implementation project(':ignite-security')
+    implementation project(':ignite-catalog')
     implementation libs.jetbrains.annotations
     implementation libs.micronaut.inject
     implementation libs.micronaut.validation
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 87c3779524..999db9782b 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -45,6 +45,9 @@ import org.apache.ignite.client.handler.ClientHandlerModule;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.deployment.IgniteDeployment;
 import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.CatalogServiceImpl;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -487,6 +490,8 @@ public class IgniteImpl implements Ignite {
 
         indexManager = new IndexManager(tablesConfiguration, schemaManager, distributedTblMgr);
 
+        CatalogService catalogService = new CatalogServiceImpl(metaStorageMgr);
+
         qryEngine = new SqlQueryProcessor(
                 registry,
                 clusterSvc,
@@ -498,7 +503,8 @@ public class IgniteImpl implements Ignite {
                 distributionZoneManager,
                 () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
                 replicaSvc,
-                clock
+                clock,
+                new CatalogManagerImpl(catalogService, metaStorageMgr)
         );
 
         sql = new IgniteSqlImpl(qryEngine);
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 944f2ed923..e8af2d0ac0 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -33,6 +33,7 @@ dependencies {
     implementation project(':ignite-transactions')
     implementation project(':ignite-replicator')
     implementation project(':ignite-distribution-zones')
+    implementation project(':ignite-catalog')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.caffeine
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index c4ea27f00a..6b7b4a32cb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -43,6 +43,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.index.IndexManager;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
@@ -151,6 +153,9 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Clock. */
     private final HybridClock clock;
 
+    /** Distributed catalog manager. */
+    private CatalogManager catalogManager;
+
     /** Constructor. */
     public SqlQueryProcessor(
             Consumer<Function<Long, CompletableFuture<?>>> registry,
@@ -163,7 +168,8 @@ public class SqlQueryProcessor implements QueryProcessor {
             DistributionZoneManager distributionZoneManager,
             Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier,
             ReplicaService replicaService,
-            HybridClock clock
+            HybridClock clock,
+            CatalogManager catalogManager
     ) {
         this.registry = registry;
         this.clusterSrvc = clusterSrvc;
@@ -176,6 +182,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
         this.replicaService = replicaService;
         this.clock = clock;
+        this.catalogManager = catalogManager;
     }
 
     /** {@inheritDoc} */
@@ -220,7 +227,9 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         this.prepareSvc = prepareSvc;
 
-        var ddlCommandHandler = new DdlCommandHandler(distributionZoneManager, tableManager, indexManager, dataStorageManager);
+        var ddlCommandHandler = CatalogManager.USE_CATALOG
+                ? new DdlCommandHandlerWrapper(distributionZoneManager, tableManager, indexManager, dataStorageManager, catalogManager)
+                : new DdlCommandHandler(distributionZoneManager, tableManager, indexManager, dataStorageManager);
 
         var executionSrvc = registerService(ExecutionServiceImpl.create(
                 clusterSrvc.topologyService(),
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
new file mode 100644
index 0000000000..f9c02a2d9f
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.ddl;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.index.IndexManager;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.table.distributed.TableManager;
+
+/**
+ * Wrapper for DDL command handler passes DDL commands to CatalogService.
+ */
+public class DdlCommandHandlerWrapper extends DdlCommandHandler {
+
+    private final CatalogManager catalogManager;
+
+    /**
+     * Constructor.
+     */
+    public DdlCommandHandlerWrapper(
+            DistributionZoneManager distributionZoneManager,
+            TableManager tableManager,
+            IndexManager indexManager,
+            DataStorageManager dataStorageManager,
+            CatalogManager catalogManager
+    ) {
+        super(distributionZoneManager, tableManager, indexManager, dataStorageManager);
+
+        this.catalogManager = Objects.requireNonNull(catalogManager, "Catalog service");
+    }
+
+    /** Handles ddl commands. */
+    @Override
+    public CompletableFuture<Boolean> handle(DdlCommand cmd) {
+        if (cmd instanceof CreateTableCommand) {
+            catalogManager.createTable(DdlToCatalogCommandConverter.convert((CreateTableCommand) cmd));
+        }
+
+        return super.handle(cmd);
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
new file mode 100644
index 0000000000..a4756079ec
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.ddl;
+
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateTableParams;
+import org.apache.ignite.internal.catalog.commands.DefaultValueParams;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.ColumnDefinition;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DefaultValueDefinition;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+
+/**
+ * Converter for DDL command classes to Catalog command params classes.
+ */
+class DdlToCatalogCommandConverter {
+    static CreateTableParams convert(CreateTableCommand cmd) {
+        CreateTableParams params = new CreateTableParams();
+
+        params.columns(cmd.columns().stream().map(DdlToCatalogCommandConverter::convert).collect(Collectors.toList()));
+
+        params.colocationColumns(cmd.colocationColumns());
+        params.primaryKeyColumns(cmd.primaryKeyColumns());
+
+        params.partitions(cmd.partitions());
+        params.replicas(cmd.replicas());
+        params.zone(cmd.zone());
+
+        params.dataStorage(cmd.dataStorage());
+        cmd.dataStorageOptions().forEach((key, value) -> params.addDataStorageOption(key, value));
+
+        return params;
+    }
+
+    private static ColumnParams convert(ColumnDefinition def) {
+        return new ColumnParams(def.name(), TypeUtils.columnType(def.type()), convert(def.defaultValueDefinition()), def.nullable());
+    }
+
+    private static DefaultValueParams convert(DefaultValueDefinition def) {
+        switch (def.type()) {
+            case CONSTANT:
+                return DefaultValueParams.constant(((DefaultValueDefinition.ConstantValue) def).value());
+
+            case FUNCTION_CALL:
+                return DefaultValueParams.functionCall(((DefaultValueDefinition.FunctionCall) def).functionName());
+
+            default:
+                throw new IllegalArgumentException("Default value definition: " + def.type());
+        }
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 5c800ed23c..f67a825ea4 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Flow;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.index.IndexManager;
@@ -134,6 +135,9 @@ public class StopCalciteModuleTest {
     @Mock
     private HybridClock clock;
 
+    @Mock
+    private CatalogManager catalogManager;
+
     private SchemaRegistry schemaReg;
 
     private final TestRevisionRegister testRevisionRegister = new TestRevisionRegister();
@@ -224,7 +228,8 @@ public class StopCalciteModuleTest {
                 distributionZoneManager,
                 Map::of,
                 mock(ReplicaService.class),
-                clock
+                clock,
+                catalogManager
         );
 
         when(tbl.tableId()).thenReturn(UUID.randomUUID());
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 7a159d6e6d..4d87d47f24 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListenerHolder;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -137,6 +138,9 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     @Mock
     HybridClock clock;
 
+    @Mock
+    CatalogManager catalogManager;
+
     /**
      * Revision listener holder. It uses for the test configurations:
      * <ul>
@@ -260,7 +264,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                         )
                 ),
                 mock(ReplicaService.class),
-                clock
+                clock,
+                catalogManager
         );
 
         queryProc.start();