You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2021/11/23 15:44:05 UTC
[ignite-3] branch main updated: IGNITE-15838 Integrate plugin system in current SQL engine (#464)
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4572913 IGNITE-15838 Integrate plugin system in current SQL engine (#464)
4572913 is described below
commit 45729130397062c188dc9bc2558d5014f0729d0e
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Tue Nov 23 18:44:00 2021 +0300
IGNITE-15838 Integrate plugin system in current SQL engine (#464)
---
.../query/calcite/SqlQueryProcessor.java | 62 ++++-
.../query/calcite/exec/ExecutionContext.java | 17 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 18 +-
.../query/calcite/exec/LogicalRelImplementor.java | 24 +-
.../calcite/extension/ExternalConvention.java | 59 ++++
.../query/calcite/extension/SqlExtension.java | 47 ++--
.../calcite/metadata/IgniteMdFragmentMapping.java | 30 +-
.../processors/query/calcite/prepare/Cloner.java | 214 +--------------
.../query/calcite/prepare/IgniteRelShuttle.java | 6 +
.../query/calcite/prepare/MappingQueryContext.java | 18 +-
.../query/calcite/prepare/PlannerPhase.java | 302 ++++++++++-----------
.../query/calcite/prepare/PlanningContext.java | 11 +
.../processors/query/calcite/prepare/Splitter.java | 7 +
.../query/calcite/rel/IgniteGateway.java | 144 ++++++++++
.../query/calcite/rel/IgniteRelVisitor.java | 5 +
.../query/calcite/schema/SchemaHolderImpl.java | 45 +++
.../processors/query/calcite/trait/TraitUtils.java | 105 ++++---
.../query/calcite/util/BaseQueryContext.java | 31 ++-
.../processors/query/calcite/util/Commons.java | 5 -
.../processors/query/calcite/util/TypeUtils.java | 5 +-
.../query/calcite/planner/PlannerTest.java | 17 +-
.../internal/calcite/ItSqlExtensionTest.java | 69 +++++
.../internal/calcite/extension/TestExtension.java | 168 ++++++++++++
.../calcite/extension/TestFilterConverterRule.java | 59 ++++
.../internal/calcite/extension/TestPhysFilter.java | 143 ++++++++++
.../calcite/extension/TestPhysTableScan.java | 78 ++++++
.../internal/calcite/extension/TestTableImpl.java | 101 +++++++
...processors.query.calcite.extension.SqlExtension | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +
29 files changed, 1331 insertions(+), 467 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
index 3b297f2..d23c6d0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
@@ -18,9 +18,12 @@
package org.apache.ignite.internal.processors.query.calcite;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
@@ -28,6 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionService
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
@@ -52,12 +56,6 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Size of the cache for query plans. */
public static final int PLAN_CACHE_SIZE = 1024;
- private volatile ExecutionService executionSrvc;
-
- private volatile MessageService msgSrvc;
-
- private volatile QueryTaskExecutor taskExecutor;
-
private final ClusterService clusterSrvc;
private final TableManager tableManager;
@@ -69,7 +67,15 @@ public class SqlQueryProcessor implements QueryProcessor {
private final QueryPlanCache planCache = new QueryPlanCacheImpl(PLAN_CACHE_SIZE);
/** Event listeners to close. */
- private final List<Pair<TableEvent, EventListener>> evtLsnrs = new ArrayList<>();
+ private final List<Pair<TableEvent, EventListener<TableEventParameters>>> evtLsnrs = new ArrayList<>();
+
+ private volatile ExecutionService executionSrvc;
+
+ private volatile MessageService msgSrvc;
+
+ private volatile QueryTaskExecutor taskExecutor;
+
+ private volatile Map<String, SqlExtension> extensions;
public SqlQueryProcessor(
ClusterService clusterSrvc,
@@ -90,6 +96,16 @@ public class SqlQueryProcessor implements QueryProcessor {
taskExecutor
);
+ List<SqlExtension> extensionList = new ArrayList<>();
+
+ ServiceLoader<SqlExtension> loader = ServiceLoader.load(SqlExtension.class);
+
+ loader.reload();
+
+ loader.forEach(extensionList::add);
+
+ extensions = extensionList.stream().collect(Collectors.toMap(SqlExtension::name, Function.identity()));
+
SchemaHolderImpl schemaHolder = new SchemaHolderImpl(planCache::clear);
executionSrvc = new ExecutionServiceImpl<>(
@@ -98,7 +114,8 @@ public class SqlQueryProcessor implements QueryProcessor {
planCache,
schemaHolder,
taskExecutor,
- ArrayRowHandler.INSTANCE
+ ArrayRowHandler.INSTANCE,
+ extensions
);
registerTableListener(TableEvent.CREATE, new TableCreatedListener(schemaHolder));
@@ -109,6 +126,8 @@ public class SqlQueryProcessor implements QueryProcessor {
msgSrvc.start();
executionSrvc.start();
planCache.start();
+
+ extensionList.forEach(ext -> ext.init(catalog -> schemaHolder.registerExternalCatalog(ext.name(), catalog)));
}
private void registerTableListener(TableEvent evt, AbstractTableEventListener lsnr) {
@@ -118,21 +137,34 @@ public class SqlQueryProcessor implements QueryProcessor {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override
public void stop() throws Exception {
busyLock.block();
- List<AutoCloseable> toClose = new ArrayList<>(Arrays.asList(
+ List<AutoCloseable> toClose = new ArrayList<>();
+
+ Map<String, SqlExtension> extensions = this.extensions;
+ if (extensions != null) {
+ toClose.addAll(
+ extensions.values().stream()
+ .map(ext -> (AutoCloseable) ext::stop)
+ .collect(Collectors.toList())
+ );
+ }
+
+ Stream<AutoCloseable> closableComponents = Stream.of(
executionSrvc::stop,
msgSrvc::stop,
taskExecutor::stop,
planCache::stop
- ));
+ );
+
+ Stream<AutoCloseable> closableListeners = evtLsnrs.stream()
+ .map((p) -> () -> tableManager.removeListener(p.left, p.right));
- toClose.addAll(evtLsnrs.stream()
- .map((p) -> (AutoCloseable) () -> tableManager.removeListener(p.left, p.right))
- .collect(Collectors.toList()));
+ toClose.addAll(
+ Stream.concat(closableComponents, closableListeners).collect(Collectors.toList())
+ );
IgniteUtils.closeAll(toClose);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 4e1cdca..b721787 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -32,6 +32,7 @@ import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Runtime context allowing access to the tables in a database.
@@ -76,8 +78,8 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
private final AtomicBoolean cancelFlag = new AtomicBoolean();
/**
- * Need to store timestamp, since SQL standard says that functions such as CURRENT_TIMESTAMP return the same value
- * throughout the query.
+ * Need to store timestamp, since SQL standard says that functions such as CURRENT_TIMESTAMP return the same value throughout the
+ * query.
*/
private final long startTs;
@@ -87,7 +89,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
* Constructor.
*
* @param executor Task executor.
- * @param qctx Base query context.
+ * @param qctx Base query context.
* @param qryId Query ID.
* @param fragmentDesc Partitions information.
* @param handler Row handler.
@@ -210,6 +212,15 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
return topVer;
}
+ /**
+ * Get an extensions by it's name.
+ *
+ * @return An extensions or {@code null} if there is no extension with given name.
+ */
+ public @Nullable SqlExtension extension(String name) {
+ return qctx.extension(name);
+ }
+
/** {@inheritDoc} */
@Override
public SchemaPlus getRootSchema() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 50a86a4..7974fcd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
@@ -142,6 +143,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
private final DdlSqlToCommandConverter ddlConverter;
+ private final Map<String, SqlExtension> extensions;
+
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -152,13 +155,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
QueryPlanCache planCache,
SchemaHolder schemaHolder,
QueryTaskExecutor taskExecutor,
- RowHandler<RowT> handler
+ RowHandler<RowT> handler,
+ Map<String, SqlExtension> extensions
) {
this.topSrvc = topSrvc;
this.handler = handler;
this.msgSrvc = msgSrvc;
this.schemaHolder = schemaHolder;
this.taskExecutor = taskExecutor;
+ this.extensions = extensions;
locNodeId = topSrvc.localMember().id();
qryPlanCache = planCache;
@@ -227,7 +232,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
BaseQueryContext qctx,
Object[] params
) {
- plan.init(mappingSrvc, new MappingQueryContext(locNodeId, topologyVersion()));
+ plan.init(mappingSrvc, new MappingQueryContext(qctx, locNodeId, topologyVersion()));
List<Fragment> fragments = plan.fragments();
@@ -365,6 +370,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
.build()
)
.logger(LOG)
+ .extensions(extensions)
.build();
}
@@ -383,9 +389,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
sqlNode = validated.sqlNode();
-
+
IgniteRel igniteRel = optimize(sqlNode, planner);
-
+
// Split query plan to query fragments.
List<Fragment> fragments = new Splitter().go(igniteRel);
@@ -444,7 +450,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
// Convert to Relational operators graph
IgniteRel igniteRel = optimize(sqlNode, planner);
-
+
// Split query plan to query fragments.
List<Fragment> fragments = new Splitter().go(igniteRel);
@@ -471,7 +477,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
// Convert to Relational operators graph
IgniteRel igniteRel = optimize(sql, planner);
-
+
String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
return new ExplainPlan(plan, explainFieldsMetadata(ctx));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 6286559..e214429 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.lang.IgniteInternalException;
/**
* Implements a query plan.
@@ -317,6 +319,9 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
ImmutableBitSet requiredColumns = rel.requiredColumns();
InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
+
+ assert tbl != null;
+
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
@@ -339,6 +344,18 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
/** {@inheritDoc} */
@Override
+ public Node<RowT> visit(IgniteGateway rel) {
+ var extension = ctx.extension(rel.extensionName());
+
+ if (extension == null) {
+ throw new IgniteInternalException("Unknown SQL extension \"" + rel.extensionName() + "\"");
+ }
+
+ return extension.<RowT>implementor().implement(ctx, (IgniteRel) rel.getInput());
+ }
+
+ /** {@inheritDoc} */
+ @Override
public Node<RowT> visit(IgniteValues rel) {
List<RexLiteral> vals = Commons.flat(Commons.cast(rel.getTuples()));
@@ -493,7 +510,11 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
case INSERT:
case UPDATE:
case DELETE:
- ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), rel.getTable().unwrap(InternalIgniteTable.class),
+ InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
+
+ assert tbl != null;
+
+ ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), tbl,
rel.getOperation(), rel.getUpdateColumnList());
Node<RowT> input = visit(rel.getInput());
@@ -697,6 +718,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
return visit((IgniteRel) rel);
}
+ @SuppressWarnings("unchecked")
public <T extends Node<RowT>> T go(IgniteRel rel) {
return (T) visit(rel);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/ExternalConvention.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/ExternalConvention.java
new file mode 100644
index 0000000..6580ee0
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/ExternalConvention.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.extension;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+
+/**
+ * A handy class to introduce a new convention to the engine via Extension API.
+ */
+public class ExternalConvention extends Convention.Impl {
+ /**
+ * Constructor.
+ *
+ * @param name The name of the convention. Must be equal to {@link SqlExtension#name()}.
+ * @param relClass The interface (usually, but could be a class as well) every relation of given convention have to implement.
+ */
+ public ExternalConvention(String name, Class<? extends IgniteRel> relClass) {
+ super(name, relClass);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelNode enforce(RelNode rel, RelTraitSet toTraits) {
+ return TraitUtils.enforce(rel, toTraits);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ConventionTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return toConvention == IgniteConvention.INSTANCE;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java
index 7fa84ed..93abdb4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java
@@ -20,25 +20,30 @@ package org.apache.ignite.internal.processors.query.calcite.extension;
import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.query.calcite.SqlQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.jetbrains.annotations.Nullable;
/**
* Entry point to extend current sql engine with external storage or even custom execution.
+ *
+ * <h3>Extension lifecycle</h3>
+ * All extensions are created in a start phase of {@link SqlQueryProcessor} and initialized after all other components
+ * of {@link SqlQueryProcessor} have been started.
+ *
+ * <p>All extensions are stopped in the very beginning of the stop phase of {@link SqlQueryProcessor}, just before the other components.
*/
public interface SqlExtension {
/**
* Returns the name of the current extension.
*
* <p>This name will be used to distinguish between different
- * extensions. Also the {@link CatalogUpdateListener} will register
- * provided catalog with the same name.
+ * extensions. Also the {@link CatalogUpdateListener} will register provided catalog with the same name.
*
* @return Name of the extension.
*/
@@ -47,29 +52,29 @@ public interface SqlExtension {
/**
* Initializes the extension before use.
*
- * @param ignite Instance of the current Ignite node.
- * @param catalogUpdateListener Listener to notify when new table or schema
- * are available. Note: the catalog listener creates
- * copy of the provided catalog, so its not enough
- * to just update {@link ExternalCatalog catalog} or
- * {@link ExternalSchema schema}, the listener should
- * be called explicitly to register changes.
+ * @param catalogUpdateListener Listener to notify when new table or schema are available. Note: the catalog listener creates copy of
+ * the provided catalog, so its not enough to just update {@link ExternalCatalog catalog} or {@link
+ * ExternalSchema schema}, the listener should be called explicitly to register changes.
* @see ExternalSchema
* @see ExternalCatalog
*/
void init(
- Ignite ignite,
CatalogUpdateListener catalogUpdateListener
);
/**
+ * Stops the extension.
+ */
+ default void stop() {}
+
+ /**
* Returns a set of optimization rules for given optimization phase.
*
* @param phase Current optimization phase.
* @return Set of rules, or empty set if there are no rules for given phase.
*/
Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase);
-
+
/**
* Returns an implementor of relations provided by current extension.
*
@@ -80,7 +85,7 @@ public interface SqlExtension {
* @see RelImplementor
*/
<RowT> RelImplementor<RowT> implementor();
-
+
/**
* Returns colocation group for given relational tree.
*
@@ -91,7 +96,7 @@ public interface SqlExtension {
* @return Colocation of given relation tree.
*/
ColocationGroup colocationGroup(IgniteRel node);
-
+
/**
* Implementer to create execution nodes from provided relational nodes.
*
@@ -108,36 +113,36 @@ public interface SqlExtension {
* <p>It's guaranteed that the tree will only consist of the relations
* provided by the current extension.
*
- * @param ctx An execution context.
+ * @param ctx An execution context.
* @param node A root of the relational tree.
* @return A root of the resulting execution tree.
*/
Node<RowT> implement(ExecutionContext<RowT> ctx, IgniteRel node);
}
-
+
/**
* Represents an external SQL schema that is simply a group different tables.
*/
interface ExternalSchema {
/** Returns list of all tables provided by current schema. */
List<String> tableNames();
-
+
/**
* Returns table by its name.
*
* @param name Name of the table.
* @return The table, or {@code null} if there is no table with given name.
*/
- @Nullable RelOptTable table(String name);
+ @Nullable IgniteTable table(String name);
}
-
+
/**
* Represents an external SQL catalog that is simply a group different schemas.
*/
interface ExternalCatalog {
/** Returns list of all schemas provided by current catalog. */
List<String> schemaNames();
-
+
/**
* Returns schema by its name.
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
index 2c3139a..4fd597b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
@@ -33,8 +33,10 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetada
import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
@@ -43,6 +45,7 @@ import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgnite
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.lang.IgniteInternalException;
/**
* Implementation class for {@link RelMetadataQueryEx#fragmentMapping(RelNode, MappingQueryContext)} method call.
@@ -103,10 +106,9 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
* See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*
* <p>{@link ColocationMappingException} may be thrown on two children nodes locations merge. This means that the fragment
- * (which part the parent node is) cannot be executed on any node and additional exchange is needed.
- * This case we throw {@link NodeMappingException} with an edge, where we need the additional exchange.
- * After the exchange is put into the fragment and the fragment is split into two ones,
- * fragment meta information will be recalculated for all fragments.
+ * (which part the parent node is) cannot be executed on any node and additional exchange is needed. This case we throw {@link
+ * NodeMappingException} with an edge, where we need the additional exchange. After the exchange is put into the fragment and the
+ * fragment is split into two ones, fragment meta information will be recalculated for all fragments.
*/
public FragmentMapping fragmentMapping(BiRel rel, RelMetadataQuery mq, MappingQueryContext ctx) {
RelNode left = rel.getLeft();
@@ -139,10 +141,9 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
* See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}
*
* <p>{@link ColocationMappingException} may be thrown on two children nodes locations merge. This means that the
- * fragment (which part the parent node is) cannot be executed on any node and additional exchange is needed. This
- * case we throw {@link NodeMappingException} with an edge, where we need the additional exchange. After the
- * exchange is put into the fragment and the fragment is split into two ones, fragment meta information will be
- * recalculated for all fragments.
+ * fragment (which part the parent node is) cannot be executed on any node and additional exchange is needed. This case we throw {@link
+ * NodeMappingException} with an edge, where we need the additional exchange. After the exchange is put into the fragment and the
+ * fragment is split into two ones, fragment meta information will be recalculated for all fragments.
*/
public FragmentMapping fragmentMapping(SetOp rel, RelMetadataQuery mq, MappingQueryContext ctx) {
FragmentMapping res = null;
@@ -222,6 +223,19 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
/**
* See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
+ public FragmentMapping fragmentMapping(IgniteGateway rel, RelMetadataQuery mq, MappingQueryContext ctx) {
+ var extension = ctx.extension(rel.extensionName());
+
+ if (extension == null) {
+ throw new IgniteInternalException("Unknown SQL extension \"" + rel.extensionName() + "\"");
+ }
+
+ return FragmentMapping.create(rel.sourceId(), extension.colocationGroup((IgniteRel) rel.getInput()));
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
+ */
public FragmentMapping fragmentMapping(IgniteTableFunctionScan rel, RelMetadataQuery mq, MappingQueryContext ctx) {
return FragmentMapping.create();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 58a8b0f..7891173 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -17,47 +17,18 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
-
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Cloner.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-public class Cloner implements IgniteRelVisitor<IgniteRel> {
+public class Cloner {
private final RelOptCluster cluster;
private List<IgniteReceiver> remotes;
@@ -94,182 +65,21 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
return c.visit(r);
}
- private IgniteReceiver collect(IgniteReceiver receiver) {
- if (remotes != null) {
- remotes.add(receiver);
+ private IgniteRel collect(IgniteRel rel) {
+ if (rel instanceof IgniteReceiver && remotes != null) {
+ remotes.add((IgniteReceiver) rel);
}
- return receiver;
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteSender rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteFilter rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteTrimExchange rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteProject rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteTableModify rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteNestedLoopJoin rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
- visit((IgniteRel) rel.getRight())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
- visit((IgniteRel) rel.getRight())));
+ return rel;
}
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteMergeJoin rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
- visit((IgniteRel) rel.getRight())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteIndexScan rel) {
- return rel.clone(cluster, asList());
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteTableScan rel) {
- return rel.clone(cluster, asList());
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteValues rel) {
- return rel.clone(cluster, asList());
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteUnionAll rel) {
- return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteSort rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteTableSpool rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteSortedIndexSpool rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteLimit rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteReceiver rel) {
- return collect((IgniteReceiver) rel.clone(cluster, asList()));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteExchange rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteSingleHashAggregate rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteMapHashAggregate rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteReduceHashAggregate rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteSingleSortAggregate rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteMapSortAggregate rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteReduceSortAggregate rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteHashIndexSpool rel) {
- return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteSetOp rel) {
- return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteRel visit(IgniteTableFunctionScan rel) {
- return rel.clone(cluster, asList());
- }
-
- /** {@inheritDoc} */
- @Override
+ /**
+ * Clones and associates a plan with a new cluster.
+ *
+ * @param rel The head of the relational tree.
+ * @return A new tree.
+ */
public IgniteRel visit(IgniteRel rel) {
- return rel.accept(this);
+ return collect(rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0))));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index a728aed..934d4a9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
@@ -217,6 +218,11 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteGateway rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
@Override
public IgniteRel visit(IgniteRel rel) {
return rel.accept(this);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 90f240b..27d8686 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -17,10 +17,16 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
+import org.jetbrains.annotations.Nullable;
+
/**
* Query mapping context.
*/
public class MappingQueryContext {
+ private final BaseQueryContext qctx;
+
private final String locNodeId;
private final long topVer;
@@ -31,11 +37,21 @@ public class MappingQueryContext {
* @param locNodeId Local node identifier.
* @param topVer Topology version to map.
*/
- public MappingQueryContext(String locNodeId, long topVer) {
+ public MappingQueryContext(BaseQueryContext qctx, String locNodeId, long topVer) {
+ this.qctx = qctx;
this.locNodeId = locNodeId;
this.topVer = topVer;
}
+ /**
+ * Get an extensions by it's name.
+ *
+ * @return An extensions or {@code null} if there is no extension with given name.
+ */
+ public @Nullable SqlExtension extension(String name) {
+ return qctx.extension(name);
+ }
+
public String localNodeId() {
return locNodeId;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 19c553d..64ef4eb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -20,6 +20,9 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.cbo;
import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.hep;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
@@ -69,56 +72,45 @@ import org.apache.ignite.internal.processors.query.calcite.rule.logical.ProjectS
* Represents a planner phase with its description and a used rule set.
*/
public enum PlannerPhase {
- HEP_DECORRELATE("Heuristic phase to decorrelate subqueries") {
+ HEP_DECORRELATE(
+ "Heuristic phase to decorrelate subqueries",
+ CoreRules.FILTER_SUB_QUERY_TO_CORRELATE,
+ CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE,
+ CoreRules.JOIN_SUB_QUERY_TO_CORRELATE
+ ) {
/** {@inheritDoc} */
@Override
- public RuleSet getRules(PlanningContext ctx) {
- return RuleSets.ofList(
- CoreRules.FILTER_SUB_QUERY_TO_CORRELATE,
- CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE,
- CoreRules.JOIN_SUB_QUERY_TO_CORRELATE
- );
- }
-
- /** {@inheritDoc} */
- @Override public Program getProgram(PlanningContext ctx) {
+ public Program getProgram(PlanningContext ctx) {
return hep(getRules(ctx));
}
},
- HEP_FILTER_PUSH_DOWN("Heuristic phase to push down filters") {
- /** {@inheritDoc} */
- @Override public RuleSet getRules(PlanningContext ctx) {
- return RuleSets.ofList(
- CoreRules.FILTER_MERGE,
- CoreRules.FILTER_AGGREGATE_TRANSPOSE,
- CoreRules.FILTER_SET_OP_TRANSPOSE,
- CoreRules.JOIN_CONDITION_PUSH,
- CoreRules.FILTER_INTO_JOIN,
- CoreRules.FILTER_PROJECT_TRANSPOSE
- );
- }
-
+ HEP_FILTER_PUSH_DOWN(
+ "Heuristic phase to push down filters",
+ CoreRules.FILTER_MERGE,
+ CoreRules.FILTER_AGGREGATE_TRANSPOSE,
+ CoreRules.FILTER_SET_OP_TRANSPOSE,
+ CoreRules.JOIN_CONDITION_PUSH,
+ CoreRules.FILTER_INTO_JOIN,
+ CoreRules.FILTER_PROJECT_TRANSPOSE
+ ) {
/** {@inheritDoc} */
- @Override public Program getProgram(PlanningContext ctx) {
+ @Override
+ public Program getProgram(PlanningContext ctx) {
return hep(getRules(ctx));
}
},
- HEP_PROJECT_PUSH_DOWN("Heuristic phase to push down and merge projects") {
- /** {@inheritDoc} */
- @Override public RuleSet getRules(PlanningContext ctx) {
- return RuleSets.ofList(
- ProjectScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,
- ProjectScanMergeRule.INDEX_SCAN_SKIP_CORRELATED,
-
- CoreRules.JOIN_PUSH_EXPRESSIONS,
- CoreRules.PROJECT_MERGE,
- CoreRules.PROJECT_REMOVE,
- CoreRules.PROJECT_FILTER_TRANSPOSE
- );
- }
+ HEP_PROJECT_PUSH_DOWN(
+ "Heuristic phase to push down and merge projects",
+ ProjectScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,
+ ProjectScanMergeRule.INDEX_SCAN_SKIP_CORRELATED,
+ CoreRules.JOIN_PUSH_EXPRESSIONS,
+ CoreRules.PROJECT_MERGE,
+ CoreRules.PROJECT_REMOVE,
+ CoreRules.PROJECT_FILTER_TRANSPOSE
+ ) {
/** {@inheritDoc} */
@Override
public Program getProgram(PlanningContext ctx) {
@@ -126,117 +118,111 @@ public enum PlannerPhase {
}
},
- OPTIMIZATION("Main optimization phase") {
- /** {@inheritDoc} */
- @Override
- public RuleSet getRules(PlanningContext ctx) {
- return ctx.rules(
- RuleSets.ofList(
- FilterMergeRule.Config.DEFAULT
- .withOperandFor(LogicalFilter.class).toRule(),
-
- JoinPushThroughJoinRule.Config.LEFT
- .withOperandFor(LogicalJoin.class).toRule(),
-
- JoinPushThroughJoinRule.Config.RIGHT
- .withOperandFor(LogicalJoin.class).toRule(),
-
- JoinPushExpressionsRule.Config.DEFAULT
- .withOperandFor(LogicalJoin.class).toRule(),
-
- JoinConditionPushRule.Config.DEFAULT
- .withOperandSupplier(b -> b.operand(LogicalJoin.class)
- .anyInputs()).toRule(),
-
- FilterIntoJoinRule.Config.DEFAULT
- .withOperandSupplier(b0 ->
- b0.operand(LogicalFilter.class).oneInput(b1 ->
- b1.operand(LogicalJoin.class).anyInputs())).toRule(),
-
- FilterProjectTransposeRule.Config.DEFAULT
- .withOperandFor(LogicalFilter.class, f -> true, LogicalProject.class, p -> true).toRule(),
-
- ProjectFilterTransposeRule.Config.DEFAULT
- .withOperandFor(LogicalProject.class, LogicalFilter.class).toRule(),
-
- ProjectMergeRule.Config.DEFAULT
- .withOperandFor(LogicalProject.class).toRule(),
-
- ProjectRemoveRule.Config.DEFAULT
- .withOperandSupplier(b ->
- b.operand(LogicalProject.class)
- .predicate(ProjectRemoveRule::isTrivial)
- .anyInputs()).toRule(),
-
- AggregateMergeRule.Config.DEFAULT
- .withOperandSupplier(b0 ->
- b0.operand(LogicalAggregate.class)
- .oneInput(b1 ->
- b1.operand(LogicalAggregate.class)
- .predicate(Aggregate::isSimple)
- .anyInputs())).toRule(),
-
- AggregateExpandDistinctAggregatesRule.Config.JOIN.toRule(),
-
- SortRemoveRule.Config.DEFAULT
- .withOperandSupplier(b ->
- b.operand(LogicalSort.class)
- .anyInputs()).toRule(),
-
- CoreRules.UNION_MERGE,
- CoreRules.MINUS_MERGE,
- CoreRules.INTERSECT_MERGE,
- CoreRules.UNION_REMOVE,
- CoreRules.JOIN_COMMUTE,
- CoreRules.AGGREGATE_REMOVE,
- CoreRules.JOIN_COMMUTE_OUTER,
-
- // Useful of this rule is not clear now.
- // CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
-
- PruneEmptyRules.SortFetchZeroRuleConfig.EMPTY
- .withOperandSupplier(b ->
- b.operand(LogicalSort.class).anyInputs())
- .withDescription("PruneSortLimit0")
- .as(PruneEmptyRules.SortFetchZeroRuleConfig.class)
- .toRule(),
-
- ExposeIndexRule.INSTANCE,
- ProjectScanMergeRule.TABLE_SCAN,
- ProjectScanMergeRule.INDEX_SCAN,
- FilterSpoolMergeToSortedIndexSpoolRule.INSTANCE,
- FilterSpoolMergeToHashIndexSpoolRule.INSTANCE,
- FilterScanMergeRule.TABLE_SCAN,
- FilterScanMergeRule.INDEX_SCAN,
-
- LogicalOrToUnionRule.INSTANCE,
-
- CorrelatedNestedLoopJoinRule.INSTANCE,
- CorrelateToNestedLoopRule.INSTANCE,
- NestedLoopJoinConverterRule.INSTANCE,
- MergeJoinConverterRule.INSTANCE,
-
- ValuesConverterRule.INSTANCE,
- LogicalScanConverterRule.INDEX_SCAN,
- LogicalScanConverterRule.TABLE_SCAN,
- HashAggregateConverterRule.SINGLE,
- HashAggregateConverterRule.MAP_REDUCE,
- SortAggregateConverterRule.SINGLE,
- SortAggregateConverterRule.MAP_REDUCE,
- SetOpConverterRule.SINGLE_MINUS,
- SetOpConverterRule.MAP_REDUCE_MINUS,
- SetOpConverterRule.SINGLE_INTERSECT,
- SetOpConverterRule.MAP_REDUCE_INTERSECT,
- ProjectConverterRule.INSTANCE,
- FilterConverterRule.INSTANCE,
- TableModifyConverterRule.INSTANCE,
- UnionConverterRule.INSTANCE,
- SortConverterRule.INSTANCE,
- TableFunctionScanConverterRule.INSTANCE
- )
- );
- }
-
+ OPTIMIZATION(
+ "Main optimization phase",
+ FilterMergeRule.Config.DEFAULT
+ .withOperandFor(LogicalFilter.class).toRule(),
+
+ JoinPushThroughJoinRule.Config.LEFT
+ .withOperandFor(LogicalJoin.class).toRule(),
+
+ JoinPushThroughJoinRule.Config.RIGHT
+ .withOperandFor(LogicalJoin.class).toRule(),
+
+ JoinPushExpressionsRule.Config.DEFAULT
+ .withOperandFor(LogicalJoin.class).toRule(),
+
+ JoinConditionPushRule.Config.DEFAULT
+ .withOperandSupplier(b -> b.operand(LogicalJoin.class)
+ .anyInputs()).toRule(),
+
+ FilterIntoJoinRule.Config.DEFAULT
+ .withOperandSupplier(b0 ->
+ b0.operand(LogicalFilter.class).oneInput(b1 ->
+ b1.operand(LogicalJoin.class).anyInputs())).toRule(),
+
+ FilterProjectTransposeRule.Config.DEFAULT
+ .withOperandFor(LogicalFilter.class, f -> true, LogicalProject.class, p -> true)
+ .toRule(),
+
+ ProjectFilterTransposeRule.Config.DEFAULT
+ .withOperandFor(LogicalProject.class, LogicalFilter.class).toRule(),
+
+ ProjectMergeRule.Config.DEFAULT
+ .withOperandFor(LogicalProject.class).toRule(),
+
+ ProjectRemoveRule.Config.DEFAULT
+ .withOperandSupplier(b ->
+ b.operand(LogicalProject.class)
+ .predicate(ProjectRemoveRule::isTrivial)
+ .anyInputs()).toRule(),
+
+ AggregateMergeRule.Config.DEFAULT
+ .withOperandSupplier(b0 ->
+ b0.operand(LogicalAggregate.class)
+ .oneInput(b1 ->
+ b1.operand(LogicalAggregate.class)
+ .predicate(Aggregate::isSimple)
+ .anyInputs())).toRule(),
+
+ AggregateExpandDistinctAggregatesRule.Config.JOIN.toRule(),
+
+ SortRemoveRule.Config.DEFAULT
+ .withOperandSupplier(b ->
+ b.operand(LogicalSort.class)
+ .anyInputs()).toRule(),
+
+ CoreRules.UNION_MERGE,
+ CoreRules.MINUS_MERGE,
+ CoreRules.INTERSECT_MERGE,
+ CoreRules.UNION_REMOVE,
+ CoreRules.JOIN_COMMUTE,
+ CoreRules.AGGREGATE_REMOVE,
+ CoreRules.JOIN_COMMUTE_OUTER,
+
+ // Useful of this rule is not clear now.
+ // CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
+
+ PruneEmptyRules.SortFetchZeroRuleConfig.EMPTY
+ .withOperandSupplier(b ->
+ b.operand(LogicalSort.class).anyInputs())
+ .withDescription("PruneSortLimit0")
+ .as(PruneEmptyRules.SortFetchZeroRuleConfig.class)
+ .toRule(),
+
+ ExposeIndexRule.INSTANCE,
+ ProjectScanMergeRule.TABLE_SCAN,
+ ProjectScanMergeRule.INDEX_SCAN,
+ FilterSpoolMergeToSortedIndexSpoolRule.INSTANCE,
+ FilterSpoolMergeToHashIndexSpoolRule.INSTANCE,
+ FilterScanMergeRule.TABLE_SCAN,
+ FilterScanMergeRule.INDEX_SCAN,
+
+ LogicalOrToUnionRule.INSTANCE,
+
+ CorrelatedNestedLoopJoinRule.INSTANCE,
+ CorrelateToNestedLoopRule.INSTANCE,
+ NestedLoopJoinConverterRule.INSTANCE,
+ MergeJoinConverterRule.INSTANCE,
+
+ ValuesConverterRule.INSTANCE,
+ LogicalScanConverterRule.INDEX_SCAN,
+ LogicalScanConverterRule.TABLE_SCAN,
+ HashAggregateConverterRule.SINGLE,
+ HashAggregateConverterRule.MAP_REDUCE,
+ SortAggregateConverterRule.SINGLE,
+ SortAggregateConverterRule.MAP_REDUCE,
+ SetOpConverterRule.SINGLE_MINUS,
+ SetOpConverterRule.MAP_REDUCE_MINUS,
+ SetOpConverterRule.SINGLE_INTERSECT,
+ SetOpConverterRule.MAP_REDUCE_INTERSECT,
+ ProjectConverterRule.INSTANCE,
+ FilterConverterRule.INSTANCE,
+ TableModifyConverterRule.INSTANCE,
+ UnionConverterRule.INSTANCE,
+ SortConverterRule.INSTANCE,
+ TableFunctionScanConverterRule.INSTANCE
+ ) {
/** {@inheritDoc} */
@Override
public Program getProgram(PlanningContext ctx) {
@@ -246,11 +232,17 @@ public enum PlannerPhase {
public final String description;
+ private final List<RelOptRule> rules;
+
/**
- * Set phase description.
+ * Constructor.
+ *
+ * @param description A description of the phase.
+ * @param rules A list of rules associated with the current phase.
*/
- PlannerPhase(String description) {
+ PlannerPhase(String description, RelOptRule... rules) {
this.description = description;
+ this.rules = List.of(rules);
}
/**
@@ -259,7 +251,13 @@ public enum PlannerPhase {
* @param ctx Planner context.
* @return Rule set.
*/
- public abstract RuleSet getRules(PlanningContext ctx);
+ public RuleSet getRules(PlanningContext ctx) {
+ List<RelOptRule> rules = new ArrayList<>(this.rules);
+
+ ctx.extensions().forEach(p -> rules.addAll(p.getOptimizerRules(this)));
+
+ return ctx.rules(RuleSets.ofList(rules));
+ }
/**
* Returns a program, calculated on the basis of query, planner context planner phase and rules set.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index 8e6be74..4fb4504 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.util.List;
import java.util.function.Function;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
@@ -27,6 +28,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RuleSet;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.jetbrains.annotations.NotNull;
@@ -66,6 +68,15 @@ public final class PlanningContext implements Context {
}
/**
+ * Get list of extensions.
+ *
+ * @return List of extensions.
+ */
+ public List<SqlExtension> extensions() {
+ return unwrap(BaseQueryContext.class).extensions();
+ }
+
+ /**
* Get query.
*/
public String query() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index 6bcbd68..738d86c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -104,6 +105,12 @@ public class Splitter extends IgniteRelShuttle {
return rel.clone(IdGenerator.nextId());
}
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteGateway rel) {
+ return rel.clone(IdGenerator.nextId());
+ }
+
private static class FragmentProto {
private final long id;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteGateway.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteGateway.java
new file mode 100644
index 0000000..aa4b7d5
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteGateway.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+
+/**
+ * A gateway node connecting relational trees with different conventions.
+ */
+public class IgniteGateway extends SingleRel implements SourceAwareIgniteRel {
+ private static final String EXTENSION_NAME_TERM = "name";
+
+ private final String extensionName;
+
+ private final long sourceId;
+
+ /**
+ * Constructor.
+ *
+ * @param extensionName A name of the extension the input node belongs to.
+ * @param cluster Cluster this node belongs to.
+ * @param traits A set of the traits this node satisfy.
+ * @param input An input relation.
+ */
+ public IgniteGateway(String extensionName, RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+ this(extensionName, -1L, cluster, traits, input);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param extensionName A name of the extension the input node belongs to.
+ * @param sourceId An id of the source this gateway belongs to.
+ * @param cluster Cluster this node belongs to.
+ * @param traits A set of the traits this node satisfy.
+ * @param input An input relation.
+ */
+ private IgniteGateway(String extensionName, long sourceId, RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+ super(cluster, traits, input);
+
+ this.sourceId = sourceId;
+ this.extensionName = extensionName;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param input Context to recover this relation from.
+ */
+ public IgniteGateway(RelInput input) {
+ super(input.getCluster(), input.getTraitSet().replace(IgniteConvention.INSTANCE), input.getInput());
+
+ extensionName = input.getString(EXTENSION_NAME_TERM);
+
+ Object srcIdObj = input.get("sourceId");
+ if (srcIdObj != null) {
+ sourceId = ((Number) srcIdObj).longValue();
+ } else {
+ sourceId = -1;
+ }
+ }
+
+ /**
+ * Returns the name of the extension the input node belongs to.
+ *
+ * @return The name of the extension.
+ */
+ public String extensionName() {
+ return extensionName;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .item(EXTENSION_NAME_TERM, extensionName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isEnforcer() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return planner.getCostFactory().makeZeroCost();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteGateway(extensionName, cluster, getTraitSet(), sole(inputs));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel clone(long sourceId) {
+ return new IgniteGateway(extensionName, sourceId, getCluster(), getTraitSet(), getInput());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new IgniteGateway(extensionName, getCluster(), traitSet, sole(inputs));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long sourceId() {
+ return sourceId;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 457465b..a4203e0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -165,6 +165,11 @@ public interface IgniteRelVisitor<T> {
T visit(IgniteTableFunctionScan rel);
/**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+ */
+ T visit(IgniteGateway rel);
+
+ /**
* Visits a relational node and calculates a result on the basis of node meta information.
*
* @param rel Relational node.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 6a9dfab..9d9674d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -22,8 +22,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension.ExternalCatalog;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension.ExternalSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.TableImpl;
@@ -35,6 +40,8 @@ import org.jetbrains.annotations.Nullable;
public class SchemaHolderImpl implements SchemaHolder {
private final Map<String, IgniteSchema> igniteSchemas = new HashMap<>();
+ private final Map<String, Schema> externalCatalogs = new HashMap<>();
+
private final Runnable onSchemaUpdatedCallback;
private volatile SchemaPlus calciteSchema;
@@ -57,6 +64,28 @@ public class SchemaHolderImpl implements SchemaHolder {
return schema != null ? calciteSchema.getSubSchema(schema) : calciteSchema;
}
+ /**
+ * Register an external catalog under given name.
+ *
+ * @param name Name of the external catalog.
+ * @param catalog Catalog to register.
+ */
+ public synchronized void registerExternalCatalog(String name, ExternalCatalog catalog) {
+ catalog.schemaNames().forEach(schemaName -> registerExternalSchema(name, schemaName, catalog.schema(schemaName)));
+
+ rebuild();
+ }
+
+ private void registerExternalSchema(String catalogName, String schemaName, ExternalSchema schema) {
+ Map<String, Table> tables = new HashMap<>();
+
+ schema.tableNames().forEach(name -> tables.put(name, schema.table(name)));
+
+ SchemaPlus schemaPlus = (SchemaPlus) externalCatalogs.computeIfAbsent(catalogName, n -> Frameworks.createRootSchema(false));
+
+ schemaPlus.add(schemaName, new ExternalSchemaHolder(tables));
+ }
+
public synchronized void onSchemaCreated(String schemaName) {
igniteSchemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
rebuild();
@@ -126,8 +155,12 @@ public class SchemaHolderImpl implements SchemaHolder {
private void rebuild() {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
+
newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
+
igniteSchemas.forEach(newCalciteSchema::add);
+ externalCatalogs.forEach(newCalciteSchema::add);
+
calciteSchema = newCalciteSchema;
onSchemaUpdatedCallback.run();
@@ -136,4 +169,16 @@ public class SchemaHolderImpl implements SchemaHolder {
private static String removeSchema(String schemaName, String canonicalName) {
return canonicalName.substring(schemaName.length() + 1);
}
+
+ private static class ExternalSchemaHolder extends AbstractSchema {
+ private final Map<String, Table> tables;
+
+ public ExternalSchemaHolder(Map<String, Table> tables) {
+ this.tables = tables;
+ }
+
+ @Override protected Map<String, Table> getTableMap() {
+ return tables;
+ }
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index f16a989..c53da16 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -36,6 +36,8 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
@@ -63,6 +65,7 @@ import org.apache.calcite.util.Pair;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
@@ -70,13 +73,11 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchang
import org.jetbrains.annotations.Nullable;
/**
- * TraitUtils.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * TraitUtils. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class TraitUtils {
/**
- * Enforce.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Enforce. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
@Nullable
public static RelNode enforce(RelNode rel, RelTraitSet toTraits) {
@@ -119,6 +120,14 @@ public class TraitUtils {
RelTraitDef converter = fromTrait.getTraitDef();
+ if (converter == ConventionTraitDef.INSTANCE) {
+ return convertConvention(planner, (Convention) toTrait, rel);
+ }
+
+ if (rel.getConvention() != IgniteConvention.INSTANCE) {
+ return null;
+ }
+
if (converter == RelCollationTraitDef.INSTANCE) {
return convertCollation(planner, (RelCollation) toTrait, rel);
} else if (converter == DistributionTraitDef.INSTANCE) {
@@ -130,9 +139,20 @@ public class TraitUtils {
}
}
+ private static RelNode convertConvention(RelOptPlanner planner, Convention toTrait, RelNode rel) {
+ Convention fromTrait = rel.getConvention();
+
+ if (fromTrait.satisfies(toTrait)) {
+ return rel;
+ }
+
+ RelTraitSet traits = rel.getTraitSet().replace(toTrait);
+
+ return new IgniteGateway(fromTrait.getName(), rel.getCluster(), traits, rel);
+ }
+
/**
- * Convert collation.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Convert collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
@Nullable
public static RelNode convertCollation(RelOptPlanner planner,
@@ -149,8 +169,7 @@ public class TraitUtils {
}
/**
- * Convert distribution.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Convert distribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
@Nullable
public static RelNode convertDistribution(RelOptPlanner planner,
@@ -186,8 +205,7 @@ public class TraitUtils {
}
/**
- * Convert rewindability.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Convert rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
@Nullable
public static RelNode convertRewindability(RelOptPlanner planner,
@@ -231,8 +249,7 @@ public class TraitUtils {
}
/**
- * Distribution.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Distribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static IgniteDistribution distribution(RelNode rel) {
return rel instanceof IgniteRel
@@ -241,16 +258,14 @@ public class TraitUtils {
}
/**
- * Distribution.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Distribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static IgniteDistribution distribution(RelTraitSet traits) {
return traits.getTrait(DistributionTraitDef.INSTANCE);
}
/**
- * Collation.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static RelCollation collation(RelNode rel) {
return rel instanceof IgniteRel
@@ -259,16 +274,14 @@ public class TraitUtils {
}
/**
- * Collation.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static RelCollation collation(RelTraitSet traits) {
return traits.getTrait(RelCollationTraitDef.INSTANCE);
}
/**
- * Rewindability.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static RewindabilityTrait rewindability(RelNode rel) {
return rel instanceof IgniteRel
@@ -277,16 +290,14 @@ public class TraitUtils {
}
/**
- * Rewindability.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static RewindabilityTrait rewindability(RelTraitSet traits) {
return traits.getTrait(RewindabilityTraitDef.INSTANCE);
}
/**
- * Correlation.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Correlation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static CorrelationTrait correlation(RelNode rel) {
return rel instanceof IgniteRel
@@ -295,16 +306,14 @@ public class TraitUtils {
}
/**
- * Correlation.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Correlation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static CorrelationTrait correlation(RelTraitSet traits) {
return traits.getTrait(CorrelationTraitDef.INSTANCE);
}
/**
- * ChangeTraits.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * ChangeTraits. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static RelInput changeTraits(RelInput input, RelTrait... traits) {
RelTraitSet traitSet = input.getTraitSet();
@@ -435,8 +444,7 @@ public class TraitUtils {
}
/**
- * ProjectCollation.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * ProjectCollation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static RelCollation projectCollation(RelCollation collation, List<RexNode> projects, RelDataType inputRowType) {
if (collation.getFieldCollations().isEmpty()) {
@@ -449,8 +457,7 @@ public class TraitUtils {
}
/**
- * ProjectDistribution.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * ProjectDistribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static IgniteDistribution projectDistribution(IgniteDistribution distribution, List<RexNode> projects,
RelDataType inputRowType) {
@@ -464,16 +471,23 @@ public class TraitUtils {
}
/**
- * PassThrough.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * PassThrough. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static Pair<RelTraitSet, List<RelTraitSet>> passThrough(TraitsAwareIgniteRel rel, RelTraitSet requiredTraits) {
- if (requiredTraits.getConvention() != IgniteConvention.INSTANCE || rel.getInputs().isEmpty()) {
+ return passThrough(IgniteConvention.INSTANCE, rel, requiredTraits);
+ }
+
+ /**
+ * PassThrough. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ */
+ public static Pair<RelTraitSet, List<RelTraitSet>> passThrough(Convention convention, TraitsAwareIgniteRel rel,
+ RelTraitSet requiredTraits) {
+ if (requiredTraits.getConvention() != convention || rel.getInputs().isEmpty()) {
return null;
}
List<RelTraitSet> inTraits = Collections.nCopies(rel.getInputs().size(),
- rel.getCluster().traitSetOf(IgniteConvention.INSTANCE));
+ rel.getCluster().traitSetOf(convention));
List<Pair<RelTraitSet, List<RelTraitSet>>> traits = new PropagationContext(Set.of(Pair.of(requiredTraits, inTraits)))
.propagate((in, outs) -> singletonListFromNullable(rel.passThroughCollation(in, outs)))
@@ -488,19 +502,29 @@ public class TraitUtils {
}
/**
- * Derive.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Derive. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static List<RelNode> derive(TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
+ return derive(IgniteConvention.INSTANCE, rel, inTraits);
+ }
+
+ /**
+ * Derive. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ */
+ public static List<RelNode> derive(Convention convention, TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
assert !nullOrEmpty(inTraits);
- RelTraitSet outTraits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTraits = rel.getCluster().traitSetOf(convention);
Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations = combinations(outTraits, inTraits);
if (combinations.isEmpty()) {
return List.of();
}
+ if (inTraits.stream().flatMap(List::stream).anyMatch(traitSet -> traitSet.getConvention() != convention)) {
+ return List.of();
+ }
+
return new PropagationContext(combinations)
.propagate(rel::deriveCollation)
.propagate(rel::deriveDistribution)
@@ -510,8 +534,7 @@ public class TraitUtils {
}
/**
- * SingletonListFromNullable.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * SingletonListFromNullable. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
* @param elem Elem.
*/
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
index 4605162..2f87e4d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
@@ -20,6 +20,10 @@ package org.apache.ignite.internal.processors.query.calcite.util;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
@@ -37,6 +41,7 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.processors.query.calcite.QueryCancel;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
@@ -44,6 +49,7 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.logger.NullLogger;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Base query context.
@@ -99,6 +105,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
private final RexBuilder rexBuilder;
private final QueryCancel qryCancel;
+
+ private final Map<String, SqlExtension> extensions;
private CalciteCatalogReader catalogReader;
@@ -108,7 +116,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
private BaseQueryContext(
FrameworkConfig cfg,
Context parentCtx,
- IgniteLogger log
+ IgniteLogger log,
+ Map<String, SqlExtension> extensions
) {
super(Contexts.chain(parentCtx, cfg.getContext()));
@@ -116,6 +125,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
this.cfg = Frameworks.newConfigBuilder(cfg).context(this).build();
this.log = log;
+ this.extensions = extensions;
RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, cfg.getTypeSystem());
@@ -133,7 +143,15 @@ public final class BaseQueryContext extends AbstractQueryContext {
public static BaseQueryContext empty() {
return EMPTY_CONTEXT;
}
-
+
+ public List<SqlExtension> extensions() {
+ return new ArrayList<>(extensions.values());
+ }
+
+ public @Nullable SqlExtension extension(String name) {
+ return extensions.get(name);
+ }
+
public FrameworkConfig config() {
return cfg;
}
@@ -198,6 +216,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
private Context parentCtx = Contexts.empty();
private IgniteLogger log = new NullLogger();
+
+ private Map<String, SqlExtension> extensions = Collections.emptyMap();
public Builder frameworkConfig(@NotNull FrameworkConfig frameworkCfg) {
this.frameworkCfg = frameworkCfg;
@@ -213,9 +233,14 @@ public final class BaseQueryContext extends AbstractQueryContext {
this.log = log;
return this;
}
+
+ public Builder extensions(Map<String, SqlExtension> extensions) {
+ this.extensions = extensions;
+ return this;
+ }
public BaseQueryContext build() {
- return new BaseQueryContext(frameworkCfg, parentCtx, log);
+ return new BaseQueryContext(frameworkCfg, parentCtx, log, extensions);
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 2a9110d..01be38e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorI
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractMultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
@@ -825,8 +824,4 @@ public final class Commons {
public static RelOptCluster cluster() {
return CLUSTER;
}
-
- public static MappingQueryContext mapContext(String locNodeId, long topVer) {
- return new MappingQueryContext(locNodeId, topVer);
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java
index f3ea81f..0b8e52d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.util;
+import static org.apache.calcite.util.Util.last;
import static org.apache.ignite.internal.processors.query.calcite.util.Commons.nativeTypeToClass;
import static org.apache.ignite.internal.processors.query.calcite.util.Commons.transform;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -227,11 +228,11 @@ public class TypeUtils {
return typeFactory.getResultClass(type);
}
- RelOptTable table = schema.getTableForMember(origin.subList(0, 2));
+ RelOptTable table = schema.getTableForMember(origin.subList(0, origin.size() - 1));
assert table != null;
- ColumnDescriptor fldDesc = table.unwrap(TableDescriptor.class).columnDescriptor(origin.get(2));
+ ColumnDescriptor fldDesc = table.unwrap(TableDescriptor.class).columnDescriptor(last(origin));
assert fldDesc != null;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index a761c42..9363c4e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -60,7 +60,6 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -173,7 +172,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(plan);
- plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+ plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -251,7 +250,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(plan);
- plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+ plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -334,7 +333,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(plan);
- plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+ plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
assertEquals(3, plan.fragments().size());
}
@@ -415,7 +414,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(plan);
- plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+ plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -497,7 +496,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(plan);
- plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+ plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
assertEquals(3, plan.fragments().size());
}
@@ -574,7 +573,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(plan);
- plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+ plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -835,4 +834,8 @@ public class PlannerTest extends AbstractPlannerTest {
@Nullable Predicate<ClusterNode> filter) {
return single ? select(NODES, 0) : select(NODES, 0, 1, 2, 3);
}
+
+ private static MappingQueryContext mapContext(String locNodeId, long topVer) {
+ return new MappingQueryContext(null, locNodeId, topVer);
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/ItSqlExtensionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/ItSqlExtensionTest.java
new file mode 100644
index 0000000..9403464
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/ItSqlExtensionTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite;
+
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.calcite.extension.TestExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test cases for SQL Extension API.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
+public class ItSqlExtensionTest extends AbstractBasicIntegrationTest {
+ /**
+ * Before all.
+ */
+ @BeforeAll
+ static void initTestData() {
+ createAndPopulateTable();
+ }
+
+ @Test
+ public void test() {
+ TestExtension.allNodes = CLUSTER_NODES.stream()
+ .map(i -> (IgniteImpl) i)
+ .map(IgniteImpl::id)
+ .collect(Collectors.toList());
+
+ assertQuery(""
+ + "select t.node_id,"
+ + " t.num,"
+ + " p.name"
+ + " from person p"
+ + " join TEST_EXT.CUSTOM_SCHEMA.TEST_TBL t "
+ + " on p.id = t.num"
+ + " where t.num <> 1")
+ .columnNames("NODE_ID", "NUM", "NAME")
+ .columnTypes(String.class, Integer.class, String.class)
+ .returns(nodeId(0), 2, "Ilya")
+ .returns(nodeId(1), 2, "Ilya")
+ .returns(nodeId(2), 2, "Ilya")
+ .returns(nodeId(0), 3, "Roma")
+ .returns(nodeId(1), 3, "Roma")
+ .returns(nodeId(2), 3, "Roma")
+ .check();
+ }
+
+ private String nodeId(int idx) {
+ return ((IgniteImpl) CLUSTER_NODES.get(idx)).id();
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestExtension.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestExtension.java
new file mode 100644
index 0000000..22af344
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestExtension.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import org.apache.ignite.internal.processors.query.calcite.extension.CatalogUpdateListener;
+import org.apache.ignite.internal.processors.query.calcite.extension.ExternalConvention;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A test extension implementation.
+ */
+public class TestExtension implements SqlExtension {
+ private static final String EXTENSION_NAME = "TEST_EXT";
+
+ private static final String TEST_TABLE_NAME = "TEST_TBL";
+
+ private static final String TEST_SCHEMA_NAME = "CUSTOM_SCHEMA";
+
+ static final Convention CONVENTION = new ExternalConvention(EXTENSION_NAME, IgniteRel.class);
+
+ public static List<String> allNodes;
+
+ /** {@inheritDoc} */
+ @Override
+ public String name() {
+ return EXTENSION_NAME;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void init(CatalogUpdateListener catalogUpdateListener) {
+ catalogUpdateListener.onCatalogUpdated(new ExternalCatalogImpl());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+ if (phase != PlannerPhase.OPTIMIZATION) {
+ return Set.of();
+ }
+
+ return Set.of(TestFilterConverterRule.INSTANCE);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> RelImplementor<RowT> implementor() {
+ return new RelImplementor<>() {
+ @Override
+ public Node<RowT> implement(ExecutionContext<RowT> ctx, IgniteRel node) {
+ if (node instanceof TestPhysFilter) {
+ return implement(ctx, (TestPhysFilter) node);
+ }
+
+ if (node instanceof TestPhysTableScan) {
+ return implement(ctx, (TestPhysTableScan) node);
+ }
+
+ throw new AssertionError("Unexpected node " + (node != null ? "of class " + node.getClass().getName() : "null"));
+ }
+
+ private Node<RowT> implement(ExecutionContext<RowT> ctx, TestPhysTableScan scan) {
+ RowHandler.RowFactory<RowT> factory = ctx.rowHandler().factory(ctx.getTypeFactory(), scan.getRowType());
+
+ return new ScanNode<>(
+ ctx, scan.getRowType(), List.of(
+ factory.create(ctx.localNodeId(), 1, UUID.randomUUID().toString()),
+ factory.create(ctx.localNodeId(), 2, UUID.randomUUID().toString()),
+ factory.create(ctx.localNodeId(), 3, UUID.randomUUID().toString())
+ )
+ );
+ }
+
+ private Node<RowT> implement(ExecutionContext<RowT> ctx, TestPhysFilter filter) {
+ Predicate<RowT> pred = ctx.expressionFactory().predicate(filter.getCondition(), filter.getRowType());
+
+ FilterNode<RowT> node = new FilterNode<>(ctx, filter.getRowType(), pred);
+
+ Node<RowT> input = implement(ctx, (IgniteRel) filter.getInput());
+
+ node.register(input);
+
+ return node;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ColocationGroup colocationGroup(IgniteRel node) {
+ return ColocationGroup.forNodes(allNodes);
+ }
+
+ private static class ExternalCatalogImpl implements ExternalCatalog {
+ private final Map<String, ExternalSchema> schemas = Map.of(TEST_SCHEMA_NAME, new ExternalSchemaImpl());
+
+ /** {@inheritDoc} */
+ @Override
+ public List<String> schemaNames() {
+ return List.of(TEST_SCHEMA_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable ExternalSchema schema(String name) {
+ return schemas.get(name);
+ }
+ }
+
+ private static class ExternalSchemaImpl implements ExternalSchema {
+ private final Map<String, IgniteTable> tables = Map.of(TEST_TABLE_NAME, new TestTableImpl(
+ new TableDescriptorImpl(
+ List.of(
+ new ColumnDescriptorImpl("NODE_ID", true, 0, NativeTypes.stringOf(256), null),
+ new ColumnDescriptorImpl("NUM", true, 1, NativeTypes.INT32, null),
+ new ColumnDescriptorImpl("VAL", false, 2, NativeTypes.stringOf(256), null)
+ )
+ )
+ ));
+
+ /** {@inheritDoc} */
+ @Override
+ public List<String> tableNames() {
+ return List.of(TEST_TABLE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable IgniteTable table(String name) {
+ return tables.get(name);
+ }
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestFilterConverterRule.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestFilterConverterRule.java
new file mode 100644
index 0000000..d31b5c4
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestFilterConverterRule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A converter from logical filter to the test physical filter relation.
+ */
+public class TestFilterConverterRule extends ConverterRule {
+ /** An instance of this rule. */
+ public static final RelOptRule INSTANCE = new TestFilterConverterRule();
+
+ private TestFilterConverterRule() {
+ super(Config.INSTANCE
+ .withConversion(LogicalFilter.class, Convention.NONE, TestExtension.CONVENTION, "TestFilterConverter"));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable RelNode convert(RelNode rel) {
+ RelOptCluster cluster = rel.getCluster();
+ RelTraitSet traits = rel.getTraitSet()
+ .replace(TestExtension.CONVENTION)
+ .replace(IgniteDistributions.single());
+
+ LogicalFilter filter = (LogicalFilter) rel;
+
+ RelTraitSet inputTraits = filter.getInput().getTraitSet()
+ .replace(TestExtension.CONVENTION)
+ .replace(IgniteDistributions.single());
+
+ return new TestPhysFilter(cluster, traits, convert(filter.getInput(), inputTraits), filter.getCondition());
+ }
+
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysFilter.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysFilter.java
new file mode 100644
index 0000000..1452052
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysFilter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
+
+/**
+ * A test filter node.
+ */
+public class TestPhysFilter extends Filter implements TraitsAwareIgniteRel {
+ /**
+ * Constructor.
+ *
+ * @param cluster Cluster this node belongs to.
+ * @param traits A set of the traits this node satisfy.
+ * @param input An input relation.
+ * @param condition The predicate to filter put input rows.
+ */
+ public TestPhysFilter(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode condition) {
+ super(cluster, traits, input, condition);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param input Context to recover this relation from.
+ */
+ public TestPhysFilter(RelInput input) {
+ super(changeTraits(input, TestExtension.CONVENTION));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new TestPhysFilter(getCluster(), traitSet, input, condition);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return planner.getCostFactory().makeZeroCost();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new TestPhysFilter(cluster, getTraitSet(), sole(inputs), getCondition());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ if (!TraitUtils.rewindability(inTraits.get(0)).rewindable() && RexUtils.hasCorrelation(getCondition())) {
+ return List.of();
+ }
+
+ return List.of(Pair.of(nodeTraits.replace(TraitUtils.rewindability(inTraits.get(0))),
+ inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ return List.of(Pair.of(nodeTraits.replace(TraitUtils.distribution(inTraits.get(0))),
+ inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ return List.of(Pair.of(nodeTraits.replace(TraitUtils.collation(inTraits.get(0))),
+ inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(getCondition());
+
+ corrIds.addAll(TraitUtils.correlation(inTraits.get(0)).correlationIds());
+
+ return List.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)), inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<RelNode> derive(List<List<RelTraitSet>> inputTraits) {
+ return TraitUtils.derive(TestExtension.CONVENTION, this, inputTraits);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(RelTraitSet required) {
+ return TraitUtils.passThrough(TestExtension.CONVENTION, this, required);
+ }
+
+
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysTableScan.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysTableScan.java
new file mode 100644
index 0000000..0fc0a3b
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysTableScan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A scan over test table.
+ */
+public class TestPhysTableScan extends TableScan implements IgniteRel {
+ /**
+ * Constructor.
+ *
+ * @param cluster Cluster this node belongs to.
+ * @param traitSet A set of the traits this node satisfy.
+ * @param hints A list of hints applicable to the current node.
+ * @param table The actual table to be scanned.
+ */
+ protected TestPhysTableScan(RelOptCluster cluster, RelTraitSet traitSet,
+ List<RelHint> hints, RelOptTable table) {
+ super(cluster, traitSet, hints, table);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param input Context to recover this relation from.
+ */
+ public TestPhysTableScan(RelInput input) {
+ super(changeTraits(input, TestExtension.CONVENTION));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return (T) this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new TestPhysTableScan(cluster, getTraitSet(), getHints(), getTable());
+ }
+
+ @Override
+ public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq);
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestTableImpl.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestTableImpl.java
new file mode 100644
index 0000000..37191d8
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestTableImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A test table implementation.
+ */
+class TestTableImpl extends AbstractTable implements IgniteTable {
+ private final TableDescriptor desc;
+
+ /**
+ * Constructor.
+ *
+ * @param desc A descriptor of the table.
+ */
+ public TestTableImpl(TableDescriptor desc) {
+ this.desc = desc;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableDescriptor descriptor() {
+ return desc;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet requiredColumns) {
+ return desc.rowType((IgniteTypeFactory) typeFactory, requiredColumns);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Statistic getStatistic() {
+ return new Statistic() {
+ @Override
+ public @Nullable List<RelCollation> getCollations() {
+ return List.of();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
+ RelTraitSet traits = cluster.traitSetOf(TestExtension.CONVENTION)
+ .replace(desc.distribution());
+
+ return new TestPhysTableScan(
+ cluster, traits, List.of(), relOptTbl
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteDistribution distribution() {
+ return desc.distribution();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <C> C unwrap(Class<C> cls) {
+ if (cls.isInstance(desc)) {
+ return cls.cast(desc);
+ }
+
+ return super.unwrap(cls);
+ }
+}
diff --git a/modules/runner/src/integrationTest/resources/META-INF/services/org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension b/modules/runner/src/integrationTest/resources/META-INF/services/org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension
new file mode 100644
index 0000000..4de4690
--- /dev/null
+++ b/modules/runner/src/integrationTest/resources/META-INF/services/org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension
@@ -0,0 +1 @@
+org.apache.ignite.internal.calcite.extension.TestExtension
\ No newline at end of file
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0bd11d3..c529ff2 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -385,6 +385,13 @@ public class IgniteImpl implements Ignite {
}
/**
+ * Returns the id of the current node.
+ */
+ public String id() {
+ return clusterSvc.topologyService().localMember().id();
+ }
+
+ /**
* Checks node status. If it's {@link Status#STOPPING} then prevents further starting and throws NodeStoppingException that will lead to
* stopping already started components later on, otherwise starts component and add it to started components list.
*