You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/03/16 22:56:13 UTC
[ignite-3] 01/01: WIP. Split into CatalogService and CatalogManager. Add DdlHandler wrapper. Add CreateTable command.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-18535
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 1d827d291c87bd9f826e4a908db9d830e3a23872
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Mar 16 18:35:48 2023 +0300
WIP. Split into CatalogService and CatalogManager. Add DdlHandler wrapper. Add CreateTable command.
---
modules/catalog/build.gradle | 4 +-
.../ignite/internal/catalog/CatalogManager.java} | 27 ++-
.../internal/catalog/CatalogManagerImpl.java | 57 +++++++
.../ignite/internal/catalog/CatalogService.java | 14 +-
.../internal/catalog/CatalogServiceImpl.java | 53 ++++--
.../ColumnParams.java} | 71 ++++----
.../catalog/commands/CreateTableParams.java | 184 +++++++++++++++++++++
.../catalog/commands/DefaultValueParams.java | 97 +++++++++++
.../catalog/descriptors/CatalogDescriptor.java | 18 +-
.../catalog/descriptors/ObjectDescriptor.java | 5 +
.../catalog/descriptors/SchemaDescriptor.java | 9 +
.../catalog/descriptors/TableColumnDescriptor.java | 11 ++
.../catalog/descriptors/TableDescriptor.java | 9 +
modules/runner/build.gradle | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +-
modules/sql-engine/build.gradle | 1 +
.../internal/sql/engine/SqlQueryProcessor.java | 13 +-
.../engine/exec/ddl/DdlCommandHandlerWrapper.java | 61 +++++++
.../exec/ddl/DdlToCatalogCommandConverter.java | 67 ++++++++
.../internal/sql/engine/StopCalciteModuleTest.java | 7 +-
.../sql/engine/exec/MockedStructuresTest.java | 7 +-
21 files changed, 647 insertions(+), 77 deletions(-)
diff --git a/modules/catalog/build.gradle b/modules/catalog/build.gradle
index a126476f48..6745603773 100644
--- a/modules/catalog/build.gradle
+++ b/modules/catalog/build.gradle
@@ -27,10 +27,8 @@ dependencies {
implementation project(':ignite-api')
implementation project(':ignite-core')
implementation project(':ignite-configuration')
+ implementation project(':ignite-metastorage-api')
implementation libs.jetbrains.annotations
-
- testImplementation(testFixtures(project(':ignite-configuration')))
- testImplementation(testFixtures(project(':ignite-core')))
}
description = "ignite-catalog"
diff --git a/modules/catalog/build.gradle b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
similarity index 53%
copy from modules/catalog/build.gradle
copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index a126476f48..8feea3d3e5 100644
--- a/modules/catalog/build.gradle
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -15,22 +15,19 @@
* limitations under the License.
*/
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.catalog;
-dependencies {
- annotationProcessor project(':ignite-configuration-annotation-processor')
- annotationProcessor libs.auto.service
- implementation libs.jetbrains.annotations
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.CreateTableParams;
- implementation project(':ignite-api')
- implementation project(':ignite-core')
- implementation project(':ignite-configuration')
- implementation libs.jetbrains.annotations
+/**
+ * Catalog manager provides schema manipulation methods is responsible for managing distributed operations.
+ */
+public interface CatalogManager {
+ //TODO: IGNITE-18535 Remove when all versioned schema stuff will be moved to Catalog.
+ @Deprecated(forRemoval = true)
+ boolean USE_CATALOG = Boolean.getBoolean("IGNITE_USE_CATALOG");
- testImplementation(testFixtures(project(':ignite-configuration')))
- testImplementation(testFixtures(project(':ignite-core')))
+ //TODO: IGNITE-18535 enrich with schema manipulation methods.
+ CompletableFuture<?> createTable(CreateTableParams command);
}
-
-description = "ignite-catalog"
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
new file mode 100644
index 0000000000..771a3a6da9
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.CreateTableParams;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+
+/**
+ * Catalog manager implementation.
+ */
+public class CatalogManagerImpl implements CatalogManager {
+ // private final CatalogService catalogService;
+ // private final MetaStorageManager metaStorage;
+
+ /**
+ * Constructor.
+ */
+ public CatalogManagerImpl(CatalogService catalogService, MetaStorageManager metaStorage) {
+ // this.catalogService = catalogService;
+ //
+ // this.metaStorage = metaStorage;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> createTable(CreateTableParams params) {
+ // Create operation future, which will returned, and saved to a map.
+ CompletableFuture<Object> opFuture = CompletableFuture.completedFuture(null);
+
+ // Creates TableDescriptor and saves it to MetaStorage.
+ // Atomically:
+ // int tableId = metaStorage.get("lastTableId");
+ // metaStorage.put("table-"+tableId, new TableDescriptor(tableId, params));
+ // metaStorage.put("lastTableId", tableId+1);
+
+ // Subscribes operation future to the MetaStorage future for failure handling
+ // Operation future must be completed when got event from catalog service for expected table.
+
+ return opFuture;
+ }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 0767cbde36..26885f90a3 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -18,21 +18,20 @@
package org.apache.ignite.internal.catalog;
import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
-import org.apache.ignite.internal.configuration.DynamicConfigurationChanger;
/**
* Catalog service provides methods to access schema object's descriptors of exact version and/or last actual version at given timestamp,
* which is logical point-in-time.
*
- * <p>Catalog service is responsible for proper configuration updates and storing/restoring schema evolution history (schema versions)
- * for time-travelled queries purposes and lazy data evolution purposes.
+ * <p>Catalog service listens distributed schema update event, stores/restores schema evolution history (schema versions) for time-travelled
+ * queries purposes and for lazy data evolution purposes.
*
- * <p>TBD: schema manipulation methods.
- * TBD: events
+ * <p>TBD: events
+ *
+ * @see org.apache.ignite.internal.catalog.events.CatalogEvent
*/
public interface CatalogService {
TableDescriptor table(String tableName, long timestamp);
@@ -46,7 +45,4 @@ public interface CatalogService {
SchemaDescriptor schema(int version);
SchemaDescriptor activeSchema(long timestamp);
-
- //TODO: IGNITE-18535 enrich with schema manipulation methods.
- CompletableFuture<SchemaDescriptor> updateSchema(DynamicConfigurationChanger changer);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index e46d1bc788..3ea85ccfb0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.catalog;
import java.util.Collection;
import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -30,20 +29,44 @@ import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import org.apache.ignite.internal.configuration.DynamicConfigurationChanger;
import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.Nullable;
/**
- * TODO: IGNITE-18535 Fix javadoc
+ * Catalog service implementation.
*/
public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParameters> implements CatalogService {
-
/** Versioned catalog descriptors. */
- private final ConcurrentMap<Integer, CatalogDescriptor> catalogByVer = new ConcurrentHashMap<>(); //TODO: IGNITE-18535 Use IntMap instead.
+ //TODO: IGNITE-18535 Use copy-on-write approach with IntMap instead??
+ private final ConcurrentMap<Integer, CatalogDescriptor> catalogByVer = new ConcurrentHashMap<>();
/** Versioned catalog descriptors sorted in chronological order. */
- private final ConcurrentNavigableMap<Long, CatalogDescriptor> catalogByTs = new ConcurrentSkipListMap<>(); //TODO: IGNITE-18535 Use LongMap instead.
+ //TODO: IGNITE-18535 Use copy-on-write approach with Map instead??
+ private final ConcurrentNavigableMap<Long, CatalogDescriptor> catalogByTs = new ConcurrentSkipListMap<>();
+
+ private final MetaStorageManager metaStorageMgr;
+
+ private final WatchListener catalogVersionsListener;
+
+ /**
+ * Constructor.
+ */
+ public CatalogServiceImpl(MetaStorageManager metaStorageMgr) {
+ this.metaStorageMgr = metaStorageMgr;
+ catalogVersionsListener = new CatalogEventListener();
+ }
+
+ public void start() {
+ metaStorageMgr.registerPrefixWatch(ByteArray.fromString("catalog-"), catalogVersionsListener);
+ }
+
+ public void stop() {
+ metaStorageMgr.unregisterWatch(catalogVersionsListener);
+ }
/** {@inheritDoc} */
@Override
@@ -81,12 +104,6 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
return catalogAt(timestamp).schema(CatalogDescriptor.DEFAULT_SCHEMA_NAME);
}
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<SchemaDescriptor> updateSchema(DynamicConfigurationChanger changer) {
- return null;
- }
-
private CatalogDescriptor catalog(int version) {
return catalogByVer.get(version);
}
@@ -100,4 +117,16 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
return entry.getValue();
}
+
+ private static class CatalogEventListener implements WatchListener {
+ @Override
+ public void onUpdate(WatchEvent event) {
+
+ }
+
+ @Override
+ public void onError(Throwable e) {
+
+ }
+ }
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/ColumnParams.java
similarity index 51%
copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/ColumnParams.java
index 4a7d9aeb60..7c7dfbb18c 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/ColumnParams.java
@@ -15,61 +15,72 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.catalog.descriptors;
+package org.apache.ignite.internal.catalog.commands;
-import java.io.Serializable;
import java.util.Objects;
-import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.sql.ColumnType;
-/**
- * Table column descriptor.
- */
-public class TableColumnDescriptor implements Serializable {
- private static final long serialVersionUID = 7684890562398520509L;
-
+/** Defines a particular column within table. */
+public class ColumnParams {
private final String name;
+
private final ColumnType type;
+
private final boolean nullable;
- /** Max length constraint. */
- private int length;
- private int precision;
- private int scale;
- private String defaultValueExpression;
- public TableColumnDescriptor(String name, ColumnType type, boolean nullable) {
+ private final DefaultValueParams defaultValueDefinition;
+
+ /** Creates a column definition. */
+ public ColumnParams(String name, ColumnType type, DefaultValueParams defaultValueDefinition, boolean nullable) {
this.name = Objects.requireNonNull(name, "name");
- this.type = Objects.requireNonNull(type);
+ this.type = Objects.requireNonNull(type, "type");
+ this.defaultValueDefinition = Objects.requireNonNull(defaultValueDefinition, "defaultValueDefinition");
this.nullable = nullable;
}
+ /**
+ * Get column's name.
+ */
public String name() {
return name;
}
- public boolean nullable() {
- return nullable;
- }
-
+ /**
+ * Get column's type.
+ */
public ColumnType type() {
return type;
}
- public int precision() {
- return precision;
+ /**
+ * Returns default value definition.
+ *
+ * @param <T> Desired subtype of the definition.
+ * @return Default value definition.
+ */
+ @SuppressWarnings("unchecked")
+ public <T extends DefaultValueParams> T defaultValueDefinition() {
+ return (T) defaultValueDefinition;
}
- public int scale() {
- return scale;
+ /**
+ * Get nullable flag: {@code true} if this column accepts nulls.
+ */
+ public boolean nullable() {
+ return nullable;
}
- public int length() {
- return length;
+ /**
+ * Get column's precision.
+ */
+ public Integer precision() {
+ return null;
}
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return S.toString(this);
+ /**
+ * Get column's scale.
+ */
+ public Integer scale() {
+ return null;
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
new file mode 100644
index 0000000000..d9fc08e73b
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * CREATE TABLE statement.
+ */
+public class CreateTableParams {
+ /** Replicas number. */
+ private Integer replicas;
+
+ /** Number of partitions for the new table. */
+ private Integer partitions;
+
+ /** Primary key columns. */
+ private List<String> pkCols;
+
+ /** Colocation columns. */
+ private List<String> colocationCols;
+
+ /** Columns. */
+ private List<ColumnParams> cols;
+
+ private String dataStorage;
+
+ @Nullable
+ private Map<String, Object> dataStorageOptions;
+
+ private String zone;
+
+ /**
+ * Get primary key columns.
+ */
+ public List<String> primaryKeyColumns() {
+ return pkCols;
+ }
+
+ /**
+ * Set primary key columns.
+ */
+ public void primaryKeyColumns(List<String> pkCols) {
+ this.pkCols = pkCols;
+ }
+
+ /**
+ * Get replicas count.
+ */
+ @Nullable
+ public Integer replicas() {
+ return replicas;
+ }
+
+ /**
+ * Set replicas count.
+ */
+ @Nullable
+ public void replicas(int repl) {
+ replicas = repl;
+ }
+
+ /**
+ * Get partitions count.
+ */
+ @Nullable
+ public Integer partitions() {
+ return partitions;
+ }
+
+ /**
+ * Set partitions count.
+ */
+ public void partitions(Integer parts) {
+ partitions = parts;
+ }
+
+ /**
+ * Get table columns.
+ *
+ * @return Columns.
+ */
+ public List<ColumnParams> columns() {
+ return cols;
+ }
+
+ /**
+ * Set table columns.
+ *
+ * @param cols Columns.
+ */
+ public void columns(List<ColumnParams> cols) {
+ this.cols = cols;
+ }
+
+ /**
+ * Set colocation column names.
+ *
+ * @return Collocation column names.
+ */
+ @Nullable
+ public List<String> colocationColumns() {
+ return colocationCols;
+ }
+
+ /**
+ * Get colocation column names.
+ *
+ * @param colocationCols Colocation column names.
+ */
+ public void colocationColumns(List<String> colocationCols) {
+ this.colocationCols = colocationCols;
+ }
+
+ /**
+ * Returns data storage.
+ */
+ public String dataStorage() {
+ return dataStorage;
+ }
+
+ /**
+ * Sets data storage.
+ *
+ * @param dataStorage Data storage.
+ */
+ public void dataStorage(String dataStorage) {
+ this.dataStorage = dataStorage;
+ }
+
+ /**
+ * Returns data storage options.
+ */
+ public Map<String, Object> dataStorageOptions() {
+ return dataStorageOptions == null ? Map.of() : dataStorageOptions;
+ }
+
+ /**
+ * Adds data storage option.
+ *
+ * @param name Option name.
+ * @param value Option value.
+ */
+ public void addDataStorageOption(String name, Object value) {
+ if (dataStorageOptions == null) {
+ dataStorageOptions = new HashMap<>();
+ }
+
+ dataStorageOptions.put(name, value);
+ }
+
+ /**
+ * Get zone name.
+ */
+ @Nullable
+ public String zone() {
+ return zone;
+ }
+
+ /**
+ * Set zone name.
+ */
+ public void zone(String zoneName) {
+ this.zone = zoneName;
+ }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DefaultValueParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DefaultValueParams.java
new file mode 100644
index 0000000000..1e8d9227ab
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DefaultValueParams.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import java.util.Objects;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Definition of value provider to use as default.
+ */
+@SuppressWarnings("PublicInnerClass")
+public class DefaultValueParams {
+ /**
+ * Defines value provider as functional provider.
+ *
+ * @param name Name of the function to invoke to generate the value
+ * @return Default value definition.
+ */
+ public static DefaultValueParams functionCall(String name) {
+ return new FunctionCall(Objects.requireNonNull(name, "name"));
+ }
+
+ /**
+ * Defines value provider as a constant value provider.
+ *
+ * @param value A value to use as default.
+ * @return Default value definition.
+ */
+ public static DefaultValueParams constant(@Nullable Object value) {
+ return new ConstantValue(value);
+ }
+
+ /** Types of the defaults. */
+ public enum Type {
+ /** Default is specified as a constant. */
+ CONSTANT,
+
+ /** Default is specified as a call to a function. */
+ FUNCTION_CALL
+ }
+
+ protected final Type type;
+
+ private DefaultValueParams(Type type) {
+ this.type = type;
+ }
+
+ /** Returns type of the default value. */
+ public Type type() {
+ return type;
+ }
+
+ /** Defines default value provider as a function call. */
+ public static class FunctionCall extends DefaultValueParams {
+ private final String functionName;
+
+ private FunctionCall(String functionName) {
+ super(Type.FUNCTION_CALL);
+ this.functionName = functionName;
+ }
+
+ /** Returns name of the function to use as value generator. */
+ public String functionName() {
+ return functionName;
+ }
+ }
+
+ /** Defines default value provider as a constant. */
+ public static class ConstantValue extends DefaultValueParams {
+ private final Object value;
+
+ private ConstantValue(Object value) {
+ super(Type.CONSTANT);
+ this.value = value;
+ }
+
+ /** Returns value to use as default. */
+ public Object value() {
+ return value;
+ }
+ }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java
index 9f5ba39ae3..d31a838e56 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptor.java
@@ -34,7 +34,9 @@ import org.apache.ignite.internal.tostring.S;
*/
public class CatalogDescriptor implements Serializable {
public static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
+
private static final long serialVersionUID = -2713639412596667759L;
+
private final int version;
private final long activationTimestamp;
private final Map<String, SchemaDescriptor> schemas;
@@ -44,7 +46,13 @@ public class CatalogDescriptor implements Serializable {
@IgniteToStringExclude
private transient Map<Integer, IndexDescriptor> indexesMap;
-
+ /**
+ * Constructor.
+ *
+ * @param version Catalog version.
+ * @param activationTimestamp Catalog activation timestamp.
+ * @param descriptors Schema descriptors.
+ */
public CatalogDescriptor(int version, long activationTimestamp, SchemaDescriptor[] descriptors) {
this.version = version;
this.activationTimestamp = activationTimestamp;
@@ -59,6 +67,10 @@ public class CatalogDescriptor implements Serializable {
rebuildMaps();
}
+ public int version() {
+ return version;
+ }
+
public long time() {
return activationTimestamp;
}
@@ -82,8 +94,8 @@ public class CatalogDescriptor implements Serializable {
private void rebuildMaps() {
tablesMap = schemas.values().stream().flatMap(s -> Arrays.stream(s.tables()))
.collect(Collectors.toUnmodifiableMap(ObjectDescriptor::id, Function.identity()));
- indexesMap = schemas.values().stream().flatMap(s -> Arrays.stream(s.indexes())).
- collect(Collectors.toUnmodifiableMap(ObjectDescriptor::id, Function.identity()));
+ indexesMap = schemas.values().stream().flatMap(s -> Arrays.stream(s.indexes()))
+ .collect(Collectors.toUnmodifiableMap(ObjectDescriptor::id, Function.identity()));
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java
index eabd9e5dff..5d019b3de3 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/ObjectDescriptor.java
@@ -47,6 +47,11 @@ public abstract class ObjectDescriptor implements Serializable {
return name;
}
+ /** Return schema-object type. */
+ public Type type() {
+ return type;
+ }
+
/** {@inheritDoc} */
@Override
public String toString() {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java
index 4a4c53736f..0c82cf68a4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SchemaDescriptor.java
@@ -42,6 +42,15 @@ public class SchemaDescriptor extends ObjectDescriptor {
@IgniteToStringExclude
private transient Map<String, IndexDescriptor> indexesMap; //TODO: IGNITE-18535 Drop if not used.
+ /**
+ * Constructor.
+ *
+ * @param id Schema id.
+ * @param name Schema name.
+ * @param version Catalog version.
+ * @param tables Tables description.
+ * @param indexes Indexes description.
+ */
public SchemaDescriptor(int id, String name, int version, TableDescriptor[] tables, IndexDescriptor[] indexes) {
super(id, Type.SCHEMA, name);
this.version = version;
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
index 4a7d9aeb60..4feafff9fa 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableColumnDescriptor.java
@@ -37,6 +37,13 @@ public class TableColumnDescriptor implements Serializable {
private int scale;
private String defaultValueExpression;
+ /**
+ * Constructor.
+ *
+ * @param name Column name.
+ * @param type Column type.
+ * @param nullable Nullability flag.
+ */
public TableColumnDescriptor(String name, ColumnType type, boolean nullable) {
this.name = Objects.requireNonNull(name, "name");
this.type = Objects.requireNonNull(type);
@@ -67,6 +74,10 @@ public class TableColumnDescriptor implements Serializable {
return length;
}
+ public String defaultValueExpression() {
+ return defaultValueExpression;
+ }
+
/** {@inheritDoc} */
@Override
public String toString() {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
index 6babceaf33..30569843b0 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
@@ -46,6 +46,15 @@ public class TableDescriptor extends ObjectDescriptor {
@IgniteToStringExclude
private transient Map<String, TableColumnDescriptor> columnsMap;
+ /**
+ * Constructor.
+ *
+ * @param id Table id.
+ * @param name Table name.
+ * @param columns Table column descriptors.
+ * @param pkCols Primary key column names.
+ * @param colocationCols Colocation column names.
+ */
public TableDescriptor(int id, String name, TableColumnDescriptor[] columns, String[] pkCols, @Nullable String[] colocationCols) {
super(id, Type.TABLE, name);
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 5a23568f95..4f79e0bd90 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -57,6 +57,7 @@ dependencies {
implementation project(':ignite-placement-driver')
implementation project(':ignite-code-deployment')
implementation project(':ignite-security')
+ implementation project(':ignite-catalog')
implementation libs.jetbrains.annotations
implementation libs.micronaut.inject
implementation libs.micronaut.validation
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 87c3779524..999db9782b 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -45,6 +45,9 @@ import org.apache.ignite.client.handler.ClientHandlerModule;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.deployment.IgniteDeployment;
import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.CatalogServiceImpl;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -487,6 +490,8 @@ public class IgniteImpl implements Ignite {
indexManager = new IndexManager(tablesConfiguration, schemaManager, distributedTblMgr);
+ CatalogService catalogService = new CatalogServiceImpl(metaStorageMgr);
+
qryEngine = new SqlQueryProcessor(
registry,
clusterSvc,
@@ -498,7 +503,8 @@ public class IgniteImpl implements Ignite {
distributionZoneManager,
() -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
replicaSvc,
- clock
+ clock,
+ new CatalogManagerImpl(catalogService, metaStorageMgr)
);
sql = new IgniteSqlImpl(qryEngine);
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 944f2ed923..e8af2d0ac0 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-transactions')
implementation project(':ignite-replicator')
implementation project(':ignite-distribution-zones')
+ implementation project(':ignite-catalog')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.caffeine
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index c4ea27f00a..6b7b4a32cb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -43,6 +43,7 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.index.IndexManager;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
@@ -151,6 +153,9 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Clock. */
private final HybridClock clock;
+ /** Distributed catalog manager. */
+ private CatalogManager catalogManager;
+
/** Constructor. */
public SqlQueryProcessor(
Consumer<Function<Long, CompletableFuture<?>>> registry,
@@ -163,7 +168,8 @@ public class SqlQueryProcessor implements QueryProcessor {
DistributionZoneManager distributionZoneManager,
Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier,
ReplicaService replicaService,
- HybridClock clock
+ HybridClock clock,
+ CatalogManager catalogManager
) {
this.registry = registry;
this.clusterSrvc = clusterSrvc;
@@ -176,6 +182,7 @@ public class SqlQueryProcessor implements QueryProcessor {
this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
this.replicaService = replicaService;
this.clock = clock;
+ this.catalogManager = catalogManager;
}
/** {@inheritDoc} */
@@ -220,7 +227,9 @@ public class SqlQueryProcessor implements QueryProcessor {
this.prepareSvc = prepareSvc;
- var ddlCommandHandler = new DdlCommandHandler(distributionZoneManager, tableManager, indexManager, dataStorageManager);
+ var ddlCommandHandler = CatalogManager.USE_CATALOG
+ ? new DdlCommandHandlerWrapper(distributionZoneManager, tableManager, indexManager, dataStorageManager, catalogManager)
+ : new DdlCommandHandler(distributionZoneManager, tableManager, indexManager, dataStorageManager);
var executionSrvc = registerService(ExecutionServiceImpl.create(
clusterSrvc.topologyService(),
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
new file mode 100644
index 0000000000..f9c02a2d9f
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.ddl;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.index.IndexManager;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.table.distributed.TableManager;
+
+/**
+ * Wrapper for DDL command handler passes DDL commands to CatalogService.
+ */
+public class DdlCommandHandlerWrapper extends DdlCommandHandler {
+
+ private final CatalogManager catalogManager;
+
+ /**
+ * Constructor.
+ */
+ public DdlCommandHandlerWrapper(
+ DistributionZoneManager distributionZoneManager,
+ TableManager tableManager,
+ IndexManager indexManager,
+ DataStorageManager dataStorageManager,
+ CatalogManager catalogManager
+ ) {
+ super(distributionZoneManager, tableManager, indexManager, dataStorageManager);
+
+ this.catalogManager = Objects.requireNonNull(catalogManager, "Catalog service");
+ }
+
+ /** Handles ddl commands. */
+ @Override
+ public CompletableFuture<Boolean> handle(DdlCommand cmd) {
+ if (cmd instanceof CreateTableCommand) {
+ catalogManager.createTable(DdlToCatalogCommandConverter.convert((CreateTableCommand) cmd));
+ }
+
+ return super.handle(cmd);
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
new file mode 100644
index 0000000000..a4756079ec
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.ddl;
+
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateTableParams;
+import org.apache.ignite.internal.catalog.commands.DefaultValueParams;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.ColumnDefinition;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DefaultValueDefinition;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+
+/**
+ * Converter for DDL command classes to Catalog command params classes.
+ */
+class DdlToCatalogCommandConverter {
+ static CreateTableParams convert(CreateTableCommand cmd) {
+ CreateTableParams params = new CreateTableParams();
+
+ params.columns(cmd.columns().stream().map(DdlToCatalogCommandConverter::convert).collect(Collectors.toList()));
+
+ params.colocationColumns(cmd.colocationColumns());
+ params.primaryKeyColumns(cmd.primaryKeyColumns());
+
+ params.partitions(cmd.partitions());
+ params.replicas(cmd.replicas());
+ params.zone(cmd.zone());
+
+ params.dataStorage(cmd.dataStorage());
+ cmd.dataStorageOptions().forEach((key, value) -> params.addDataStorageOption(key, value));
+
+ return params;
+ }
+
+ private static ColumnParams convert(ColumnDefinition def) {
+ return new ColumnParams(def.name(), TypeUtils.columnType(def.type()), convert(def.defaultValueDefinition()), def.nullable());
+ }
+
+ private static DefaultValueParams convert(DefaultValueDefinition def) {
+ switch (def.type()) {
+ case CONSTANT:
+ return DefaultValueParams.constant(((DefaultValueDefinition.ConstantValue) def).value());
+
+ case FUNCTION_CALL:
+ return DefaultValueParams.functionCall(((DefaultValueDefinition.FunctionCall) def).functionName());
+
+ default:
+ throw new IllegalArgumentException("Default value definition: " + def.type());
+ }
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 5c800ed23c..f67a825ea4 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.index.IndexManager;
@@ -134,6 +135,9 @@ public class StopCalciteModuleTest {
@Mock
private HybridClock clock;
+ @Mock
+ private CatalogManager catalogManager;
+
private SchemaRegistry schemaReg;
private final TestRevisionRegister testRevisionRegister = new TestRevisionRegister();
@@ -224,7 +228,8 @@ public class StopCalciteModuleTest {
distributionZoneManager,
Map::of,
mock(ReplicaService.class),
- clock
+ clock,
+ catalogManager
);
when(tbl.tableId()).thenReturn(UUID.randomUUID());
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 7a159d6e6d..4d87d47f24 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListenerHolder;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -137,6 +138,9 @@ public class MockedStructuresTest extends IgniteAbstractTest {
@Mock
HybridClock clock;
+ @Mock
+ CatalogManager catalogManager;
+
/**
* Revision listener holder. It uses for the test configurations:
* <ul>
@@ -260,7 +264,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
)
),
mock(ReplicaService.class),
- clock
+ clock,
+ catalogManager
);
queryProc.start();