You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/11/17 16:05:07 UTC
[ignite-3] branch main updated: IGNITE-15912 Merge SQL calcite query contexts refactoring to 3.0 (#454)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26025e8 IGNITE-15912 Merge SQL calcite query contexts refactoring to 3.0 (#454)
26025e8 is described below
commit 26025e87d5c8d4beaa6b72de42a3ad49207893fc
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Wed Nov 17 19:05:02 2021 +0300
IGNITE-15912 Merge SQL calcite query contexts refactoring to 3.0 (#454)
---
.../processors/query/calcite/QueryCancel.java | 92 +++++++++
...emaHolder.java => QueryCancelledException.java} | 19 +-
.../query/calcite/exec/ExchangeServiceImpl.java | 15 +-
.../query/calcite/exec/ExecutionContext.java | 64 ++++--
.../query/calcite/exec/ExecutionServiceImpl.java | 122 +++++++-----
.../query/calcite/exec/LogicalRelImplementor.java | 4 +-
.../calcite/exec/exp/agg/AccumulatorsFactory.java | 3 +-
.../query/calcite/externalize/RelJson.java | 7 +-
.../query/calcite/externalize/RelJsonReader.java | 23 +--
.../query/calcite/externalize/RelJsonWriter.java | 7 +-
.../calcite/metadata/IgniteMdFragmentMapping.java | 106 +++++-----
.../query/calcite/metadata/IgniteMetadata.java | 8 +-
.../query/calcite/metadata/RelMetadataQueryEx.java | 7 +-
.../calcite/prepare/AbstractMultiStepPlan.java | 5 +-
.../processors/query/calcite/prepare/Fragment.java | 32 +--
.../query/calcite/prepare/FragmentPlan.java | 3 +-
.../MappingQueryContext.java} | 31 ++-
.../query/calcite/prepare/MultiStepPlan.java | 3 +-
.../query/calcite/prepare/PlanningContext.java | 206 +------------------
.../query/calcite/prepare/QueryPlanCache.java | 11 +-
.../query/calcite/prepare/QueryPlanCacheImpl.java | 5 +-
.../query/calcite/prepare/QueryTemplate.java | 17 +-
.../query/calcite/schema/IgniteTableImpl.java | 4 +-
.../query/calcite/schema/InternalIgniteTable.java | 4 +-
.../query/calcite/schema/SchemaHolder.java | 5 +-
.../query/calcite/schema/SchemaHolderImpl.java | 5 +-
.../AbstractQueryContext.java} | 27 ++-
.../query/calcite/util/BaseQueryContext.java | 221 +++++++++++++++++++++
.../processors/query/calcite/util/Commons.java | 27 ++-
.../query/calcite/util/IgniteMethod.java | 5 +-
.../query/calcite/exec/RuntimeSortedIndexTest.java | 15 +-
.../calcite/exec/rel/AbstractExecutionTest.java | 15 +-
.../query/calcite/planner/AbstractPlannerTest.java | 43 ++--
.../query/calcite/planner/PlannerTest.java | 143 ++++++-------
.../java/org/apache/ignite/logger/NullLogger.java | 173 ++++++++++++++++
35 files changed, 924 insertions(+), 553 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryCancel.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryCancel.java
new file mode 100644
index 0000000..7c56c9f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryCancel.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.Cancellable;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Holds query cancel state.
+ */
+public class QueryCancel {
+ private final List<Cancellable> cancelActions = new ArrayList<>(3);
+
+ private boolean canceled;
+
+ /**
+ * Adds a cancel action.
+ *
+ * @param clo Add cancel action.
+ */
+ public synchronized void add(Cancellable clo) throws QueryCancelledException {
+ assert clo != null;
+
+ if (canceled) {
+ throw new QueryCancelledException();
+ }
+
+ cancelActions.add(clo);
+ }
+
+ /**
+ * Executes cancel closure.
+ */
+ public synchronized void cancel() {
+ if (canceled) {
+ return;
+ }
+
+ canceled = true;
+
+ IgniteException ex = null;
+
+ // Run actions in the reverse order.
+ for (int i = cancelActions.size() - 1; i >= 0; i--) {
+ try {
+ Cancellable act = cancelActions.get(i);
+
+ act.cancel();
+ } catch (Exception e) {
+ if (ex == null) {
+ ex = new IgniteException(e);
+ } else {
+ ex.addSuppressed(e);
+ }
+ }
+ }
+
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ /**
+ * Stops query execution if a user requested cancel.
+ */
+ public synchronized void checkCancelled() throws QueryCancelledException {
+ if (canceled) {
+ throw new QueryCancelledException();
+ }
+ }
+
+ public synchronized boolean isCanceled() {
+ return canceled;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryCancelledException.java
similarity index 62%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryCancelledException.java
index 2864fe9..72702e0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryCancelledException.java
@@ -15,17 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query.calcite;
-import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.lang.IgniteException;
/**
- * SchemaHolder interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * The exception is thrown if a query was cancelled or timed out while executing.
*/
-public interface SchemaHolder {
+public class QueryCancelledException extends IgniteException {
+ private static final long serialVersionUID = 0L;
+
+ public static final String ERR_MSG = "The query was cancelled while executing.";
+
/**
- * Get schema.
+ * Default constructor.
*/
- SchemaPlus schema();
+ public QueryCancelledException() {
+ super(ERR_MSG);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 52a45fd..9c5583b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMes
import org.apache.ignite.internal.processors.query.calcite.message.SqlQueryMessageGroup;
import org.apache.ignite.internal.processors.query.calcite.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
@@ -48,6 +48,8 @@ public class ExchangeServiceImpl implements ExchangeService {
private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
+ private final String localNodeId;
+
private final QueryTaskExecutor taskExecutor;
private final MailboxRegistry mailboxRegistry;
@@ -59,10 +61,12 @@ public class ExchangeServiceImpl implements ExchangeService {
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public ExchangeServiceImpl(
+ String localNodeId,
QueryTaskExecutor taskExecutor,
MailboxRegistry mailboxRegistry,
MessageService msgSrvc
) {
+ this.localNodeId = localNodeId;
this.taskExecutor = taskExecutor;
this.mailboxRegistry = mailboxRegistry;
this.msgSrvc = msgSrvc;
@@ -246,11 +250,14 @@ public class ExchangeServiceImpl implements ExchangeService {
*/
private ExecutionContext<?> baseInboxContext(String nodeId, UUID qryId, long fragmentId) {
return new ExecutionContext<>(
- taskExecutor,
- PlanningContext.builder()
- .originatingNodeId(nodeId)
+ BaseQueryContext.builder()
+ .logger(LOG)
.build(),
+ taskExecutor,
qryId,
+ localNodeId,
+ nodeId,
+ -1,
new FragmentDescription(
fragmentId,
null,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 698c673..4e1cdca 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -34,8 +34,9 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFa
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.lang.IgniteInternalException;
@@ -44,7 +45,7 @@ import org.jetbrains.annotations.NotNull;
/**
* Runtime context allowing access to the tables in a database.
*/
-public class ExecutionContext<RowT> implements DataContext {
+public class ExecutionContext<RowT> extends AbstractQueryContext implements DataContext {
private static final TimeZone TIME_ZONE = TimeZone.getDefault(); // TODO DistributedSqlConfiguration#timeZone
/**
@@ -52,15 +53,21 @@ public class ExecutionContext<RowT> implements DataContext {
*/
private static final Locale LOCALE = Locale.ENGLISH;
- private final UUID qryId;
+ private final BaseQueryContext qctx;
- private final PlanningContext ctx;
+ private final QueryTaskExecutor executor;
+
+ private final UUID qryId;
private final FragmentDescription fragmentDesc;
private final Map<String, Object> params;
- private final QueryTaskExecutor executor;
+ private final String locNodeId;
+
+ private final String originatingNodeId;
+
+ private final long topVer;
private final RowHandler<RowT> handler;
@@ -80,7 +87,7 @@ public class ExecutionContext<RowT> implements DataContext {
* Constructor.
*
* @param executor Task executor.
- * @param ctx Parent context.
+ * @param qctx Base query context.
* @param qryId Query ID.
* @param fragmentDesc Partitions information.
* @param handler Row handler.
@@ -88,34 +95,39 @@ public class ExecutionContext<RowT> implements DataContext {
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public ExecutionContext(
+ BaseQueryContext qctx,
QueryTaskExecutor executor,
- PlanningContext ctx,
UUID qryId,
+ String locNodeId,
+ String originatingNodeId,
+ long topVer,
FragmentDescription fragmentDesc,
RowHandler<RowT> handler,
Map<String, Object> params
) {
+ super(qctx);
+
this.executor = executor;
- this.ctx = ctx;
+ this.qctx = qctx;
this.qryId = qryId;
this.fragmentDesc = fragmentDesc;
this.handler = handler;
this.params = params;
+ this.locNodeId = locNodeId;
+ this.originatingNodeId = originatingNodeId;
+ this.topVer = topVer;
- expressionFactory = new ExpressionFactoryImpl<>(this, ctx.typeFactory(), ctx.conformance());
+ expressionFactory = new ExpressionFactoryImpl<>(
+ this,
+ this.qctx.typeFactory(),
+ this.qctx.config().getParserConfig().conformance()
+ );
long ts = System.currentTimeMillis();
startTs = ts + TIME_ZONE.getOffset(ts);
}
/**
- * Get parent context.
- */
- public PlanningContext planningContext() {
- return ctx;
- }
-
- /**
* Get query ID.
*/
public UUID queryId() {
@@ -181,19 +193,33 @@ public class ExecutionContext<RowT> implements DataContext {
* Get originating node ID.
*/
public String originatingNodeId() {
- return planningContext().originatingNodeId();
+ return originatingNodeId;
+ }
+
+ /**
+ * Get local node ID.
+ */
+ public String localNodeId() {
+ return locNodeId;
+ }
+
+ /**
+ * Get topology version.
+ */
+ public long topologyVersion() {
+ return topVer;
}
/** {@inheritDoc} */
@Override
public SchemaPlus getRootSchema() {
- return ctx.schema();
+ return qctx.schema();
}
/** {@inheritDoc} */
@Override
public IgniteTypeFactory getTypeFactory() {
- return ctx.typeFactory();
+ return qctx.typeFactory();
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 582d2be..50a86a4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -36,10 +36,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlDdl;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlExplainLevel;
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
@@ -88,6 +89,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToC
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.NodeLeaveHandler;
import org.apache.ignite.internal.processors.query.calcite.util.TransformingIterator;
@@ -164,7 +166,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
ddlConverter = new DdlSqlToCommandConverter();
iteratorsHolder = new ClosableIteratorsHolder(topSrvc.localMember().name(), LOG);
mailboxRegistry = new MailboxRegistryImpl(topSrvc);
- exchangeSrvc = new ExchangeServiceImpl(taskExecutor, mailboxRegistry, msgSrvc);
+ exchangeSrvc = new ExchangeServiceImpl(locNodeId, taskExecutor, mailboxRegistry, msgSrvc);
mappingSrvc = new MappingServiceImpl(topSrvc);
// TODO: fix this
affSrvc = cacheId -> Objects::hashCode;
@@ -191,9 +193,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
String qry,
Object[] params
) {
- QueryPlan plan = qryPlanCache.queryPlan(new CacheKey(getDefaultSchema(schema).getName(), qry));
+ QueryPlan plan = qryPlanCache.queryPlan(new CacheKey(schemaHolder.schema(schema).getName(), qry));
if (plan != null) {
- PlanningContext pctx = createContext(schema, qry, params);
+ PlanningContext pctx = createContext(Contexts.empty(), schema, qry, params);
return Collections.singletonList(executePlan(UUID.randomUUID(), pctx, plan));
}
@@ -202,13 +204,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
List<SqlCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
for (final SqlNode qry0 : qryList) {
- PlanningContext pctx = createContext(schema, qry0.toString(), params);
+ PlanningContext pctx = createContext(Contexts.empty(), schema, qry0.toString(), params);
if (qryList.size() == 1) {
plan = qryPlanCache.queryPlan(
- pctx,
new CacheKey(pctx.schemaName(), pctx.query()),
- pctx0 -> prepareSingle(qry0, pctx0)
+ () -> prepareSingle(qry0, pctx)
);
} else {
plan = prepareSingle(qry0, pctx);
@@ -220,8 +221,13 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
return cursors;
}
- private SqlCursor<List<?>> executeQuery(UUID qryId, MultiStepPlan plan, PlanningContext pctx) {
- plan.init(pctx);
+ private SqlCursor<List<?>> mapAndExecutePlan(
+ UUID qryId,
+ MultiStepPlan plan,
+ BaseQueryContext qctx,
+ Object[] params
+ ) {
+ plan.init(mappingSrvc, new MappingQueryContext(locNodeId, topologyVersion()));
List<Fragment> fragments = plan.fragments();
@@ -237,7 +243,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
List<String> nodes = mapping.nodeIds();
- assert nodes != null && nodes.size() == 1 && first(nodes).equals(pctx.localNodeId());
+ assert nodes != null && nodes.size() == 1 && first(nodes).equals(locNodeId);
}
FragmentDescription fragmentDesc = new FragmentDescription(
@@ -247,12 +253,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
plan.remotes(fragment));
ExecutionContext<RowT> ectx = new ExecutionContext<>(
+ qctx,
taskExecutor,
- pctx,
qryId,
+ locNodeId,
+ locNodeId,
+ topologyVersion(),
fragmentDesc,
handler,
- Commons.parametersMap(pctx.parameters()));
+ Commons.parametersMap(params));
Node<RowT> node = new LogicalRelImplementor<>(ectx, affSrvc, mailboxRegistry,
exchangeSrvc).go(fragment.root());
@@ -280,11 +289,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
QueryStartRequest req = FACTORY.queryStartRequest()
.queryId(qryId)
.fragmentId(fragment.fragmentId())
- .schema(pctx.schemaName())
+ .schema(qctx.schemaName())
.root(fragment.serialized())
- .topologyVersion(pctx.topologyVersion())
+ .topologyVersion(ectx.topologyVersion())
.fragmentDescription(fragmentDesc)
- .parameters(pctx.parameters())
+ .parameters(params)
.build();
msgSrvc.send(nodeId, req);
@@ -347,31 +356,26 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
return 1L;
}
- private PlanningContext createContext(@Nullable String schema, String qry, Object[] params) {
- return createContext(topologyVersion(), locNodeId, schema, qry, params);
+ private BaseQueryContext createQueryContext(Context parent, @Nullable String schema) {
+ return BaseQueryContext.builder()
+ .parentContext(parent)
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schemaHolder.schema(schema))
+ .build()
+ )
+ .logger(LOG)
+ .build();
}
- private PlanningContext createContext(long topVer, String originator,
- @Nullable String schema, String qry, Object[] params) {
+ private PlanningContext createContext(Context parent, @Nullable String schema, String qry, Object[] params) {
return PlanningContext.builder()
- .localNodeId(locNodeId)
- .originatingNodeId(originator)
- .parentContext(Contexts.empty())
- .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema != null
- ? schemaHolder.schema().getSubSchema(schema)
- : schemaHolder.schema())
- .build())
+ .parentContext(createQueryContext(parent, schema))
.query(qry)
.parameters(params)
- .topologyVersion(topVer)
.build();
}
- private SchemaPlus getDefaultSchema(String schema) {
- return schema != null ? schemaHolder.schema().getSubSchema(schema) : schemaHolder.schema();
- }
-
private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
IgnitePlanner planner = ctx.planner();
@@ -385,13 +389,13 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
// Split query plan to query fragments.
List<Fragment> fragments = new Splitter().go(igniteRel);
- QueryTemplate template = new QueryTemplate(mappingSrvc, fragments);
+ QueryTemplate template = new QueryTemplate(fragments);
return new MultiStepQueryPlan(template, resultSetMetadata(ctx, validated.dataType(), validated.origins()));
}
- private QueryPlan prepareFragment(PlanningContext ctx) {
- return new FragmentPlan(fromJson(ctx, ctx.query()));
+ private QueryPlan prepareFragment(BaseQueryContext ctx, String jsonFragment) {
+ return new FragmentPlan(fromJson(ctx, jsonFragment));
}
private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) {
@@ -444,7 +448,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
// Split query plan to query fragments.
List<Fragment> fragments = new Splitter().go(igniteRel);
- QueryTemplate template = new QueryTemplate(mappingSrvc, fragments);
+ QueryTemplate template = new QueryTemplate(fragments);
return new MultiStepDmlPlan(template, resultSetMetadata(ctx, igniteRel.getRowType(), null));
}
@@ -488,19 +492,24 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
case DML:
// TODO a barrier between previous operation and this one
case QUERY:
- return executeQuery(qryId, (MultiStepPlan) plan, pctx);
+ return mapAndExecutePlan(
+ qryId,
+ (MultiStepPlan) plan,
+ pctx.unwrap(BaseQueryContext.class),
+ pctx.parameters()
+ );
case EXPLAIN:
return executeExplain((ExplainPlan) plan);
case DDL:
- return executeDdl((DdlPlan) plan, pctx);
+ return executeDdl((DdlPlan) plan);
default:
throw new AssertionError("Unexpected plan type: " + plan);
}
}
- private SqlCursor<List<?>> executeDdl(DdlPlan plan, PlanningContext pctx) {
- throw new UnsupportedOperationException("plan=" + plan + ", ctx=" + pctx);
+ private SqlCursor<List<?>> executeDdl(DdlPlan plan) {
+ throw new UnsupportedOperationException("plan=" + plan);
}
private SqlCursor<List<?>> executeExplain(ExplainPlan plan) {
@@ -511,12 +520,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
return cur;
}
- private void executeFragment(UUID qryId, FragmentPlan plan, PlanningContext pctx, FragmentDescription fragmentDesc) {
- ExecutionContext<RowT> ectx = new ExecutionContext<>(taskExecutor, pctx, qryId,
- fragmentDesc, handler, Commons.parametersMap(pctx.parameters()));
-
- long frId = fragmentDesc.fragmentId();
- String origNodeId = pctx.originatingNodeId();
+ private void executeFragment(UUID qryId, FragmentPlan plan, ExecutionContext<RowT> ectx) {
+ String origNodeId = ectx.originatingNodeId();
Outbox<RowT> node = new LogicalRelImplementor<>(
ectx,
@@ -530,7 +535,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
origNodeId,
FACTORY.queryStartResponse()
.queryId(qryId)
- .fragmentId(frId)
+ .fragmentId(ectx.fragmentId())
.build()
);
} catch (IgniteInternalCheckedException e) {
@@ -564,19 +569,28 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
assert nodeId != null && msg != null;
try {
- PlanningContext pctx = createContext(msg.topologyVersion(), nodeId, msg.schema(),
- msg.root(), msg.parameters());
+ final BaseQueryContext qctx = createQueryContext(Contexts.empty(), msg.schema());
QueryPlan qryPlan = qryPlanCache.queryPlan(
- pctx,
- new CacheKey(pctx.schemaName(), pctx.query()),
- this::prepareFragment
+ new CacheKey(msg.schema(), msg.root()),
+ () -> prepareFragment(qctx, msg.root())
);
-
FragmentPlan plan = (FragmentPlan) qryPlan;
- executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription());
+ ExecutionContext<RowT> ectx = new ExecutionContext<>(
+ qctx,
+ taskExecutor,
+ msg.queryId(),
+ locNodeId,
+ nodeId,
+ msg.topologyVersion(),
+ msg.fragmentDescription(),
+ handler,
+ Commons.parametersMap(msg.parameters())
+ );
+
+ executeFragment(msg.queryId(), plan, ectx);
} catch (Throwable ex) {
LOG.error("Failed to start query fragment", ex);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index eb47a85..6286559 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -183,7 +183,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
IgniteDistribution distr = rel.distribution();
Destination<RowT> dest = distr.destination(ctx, affSrvc, ctx.group(rel.sourceId()));
- String localNodeId = ctx.planningContext().localNodeId();
+ String localNodeId = ctx.localNodeId();
FilterNode<RowT> node = new FilterNode<>(ctx, rel.getRowType(), r -> Objects.equals(localNodeId, first(dest.targets(r))));
@@ -330,7 +330,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
ctx,
rowType,
tbl,
- group.partitions(ctx.planningContext().localNodeId()),
+ group.partitions(ctx.localNodeId()),
filters,
prj,
requiredColumns
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
index 59a5b4a..67fb7d9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorsFactory.java
@@ -47,7 +47,6 @@ import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.Primitives;
@@ -82,7 +81,7 @@ public class AccumulatorsFactory<RowT> implements Supplier<List<AccumulatorWrapp
}
private static Function<Object, Object> cast0(Pair<RelDataType, RelDataType> types) {
- IgniteTypeFactory typeFactory = PlanningContext.empty().typeFactory();
+ IgniteTypeFactory typeFactory = Commons.typeFactory();
RelDataType from = types.left;
RelDataType to = types.right;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index 41092ee..f32666e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -114,8 +114,6 @@ import org.apache.ignite.lang.IgniteException;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
class RelJson {
- private final RelOptCluster cluster;
-
@SuppressWarnings("PublicInnerClass")
@FunctionalInterface
public interface RelFactory extends Function<RelInput, RelNode> {
@@ -225,8 +223,7 @@ class RelJson {
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- RelJson(RelOptCluster cluster) {
- this.cluster = cluster;
+ RelJson() {
}
Function<RelInput, RelNode> factory(String type) {
@@ -381,7 +378,7 @@ class RelJson {
private Object toJson(RexNode node) {
// removes calls to SEARCH and the included Sarg and converts them to comparisons
- node = RexUtil.expandSearch(cluster.getRexBuilder(), null, node);
+ node = RexUtil.expandSearch(Commons.cluster().getRexBuilder(), null, node);
Map<String, Object> map;
switch (node.getKind()) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
index f2ab7b2..9a980b2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
@@ -46,7 +46,7 @@ import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.lang.IgniteException;
@@ -61,8 +61,6 @@ public class RelJsonReader {
private final ObjectMapper mapper = new ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
- private final RelOptCluster cluster;
-
private final RelOptSchema relOptSchema;
private final RelJson relJson;
@@ -75,8 +73,8 @@ public class RelJsonReader {
* FromJson.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public static <T extends RelNode> T fromJson(PlanningContext ctx, String json) {
- RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+ public static <T extends RelNode> T fromJson(BaseQueryContext ctx, String json) {
+ RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
return (T) reader.read(json);
}
@@ -85,11 +83,10 @@ public class RelJsonReader {
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public RelJsonReader(RelOptCluster cluster, RelOptSchema relOptSchema) {
- this.cluster = cluster;
+ public RelJsonReader(RelOptSchema relOptSchema) {
this.relOptSchema = relOptSchema;
- relJson = new RelJson(cluster);
+ relJson = new RelJson();
}
/**
@@ -133,13 +130,13 @@ public class RelJsonReader {
/** {@inheritDoc} */
@Override
public RelOptCluster getCluster() {
- return cluster;
+ return Commons.cluster();
}
/** {@inheritDoc} */
@Override
public RelTraitSet getTraitSet() {
- return cluster.traitSet();
+ return Commons.cluster().traitSet();
}
/** {@inheritDoc} */
@@ -277,7 +274,7 @@ public class RelJsonReader {
@Override
public RelDataType getRowType(String tag) {
Object o = jsonRel.get(tag);
- return relJson.toType(Commons.typeFactory(cluster), o);
+ return relJson.toType(Commons.typeFactory(), o);
}
/** {@inheritDoc} */
@@ -286,7 +283,7 @@ public class RelJsonReader {
List<RexNode> expressionList = getExpressionList(expressionsTag);
List<String> names =
(List<String>) get(fieldsTag);
- return Commons.typeFactory(cluster).createStructType(
+ return Commons.typeFactory().createStructType(
new AbstractList<Map.Entry<String, RelDataType>>() {
@Override
public Map.Entry<String, RelDataType> get(int index) {
@@ -359,7 +356,7 @@ public class RelJsonReader {
Boolean distinct = (Boolean) jsonAggCall.get("distinct");
List<Integer> operands = (List<Integer>) jsonAggCall.get("operands");
Integer filterOperand = (Integer) jsonAggCall.get("filter");
- RelDataType type = relJson.toType(Commons.typeFactory(cluster), jsonAggCall.get("type"));
+ RelDataType type = relJson.toType(Commons.typeFactory(), jsonAggCall.get("type"));
String name = (String) jsonAggCall.get("name");
return AggregateCall.create(aggregation, distinct, false, false, operands,
filterOperand == null ? -1 : filterOperand,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
index 36215db..2c4f0a7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.sql.SqlExplainLevel;
@@ -59,7 +58,7 @@ public class RelJsonWriter implements RelWriter {
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public static String toJson(RelNode rel) {
- RelJsonWriter writer = new RelJsonWriter(rel.getCluster(), PRETTY_PRINT);
+ RelJsonWriter writer = new RelJsonWriter(PRETTY_PRINT);
rel.explain(writer);
return writer.asString();
@@ -69,10 +68,10 @@ public class RelJsonWriter implements RelWriter {
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public RelJsonWriter(RelOptCluster cluster, boolean pretty) {
+ public RelJsonWriter(boolean pretty) {
this.pretty = pretty;
- relJson = new RelJson(cluster);
+ relJson = new RelJson();
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
index a9ccf8d..2c3139a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMappingMetadata;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
@@ -41,11 +42,10 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgniteTable;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
/**
- * Implementation class for {@link RelMetadataQueryEx#fragmentMapping(RelNode)} method call.
+ * Implementation class for {@link RelMetadataQueryEx#fragmentMapping(RelNode, MappingQueryContext)} method call.
*/
public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingMetadata> {
/**
@@ -55,6 +55,19 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
ReflectiveRelMetadataProvider.reflectiveSource(
IgniteMethod.FRAGMENT_MAPPING.method(), new IgniteMdFragmentMapping());
+ /**
+ * Fragment info calculation entry point.
+ *
+ * @param rel Root node of a calculated fragment.
+ * @param mq Metadata query instance.
+ * @return Fragment meta information.
+ */
+ public static FragmentMapping fragmentMappingForMetadataQuery(RelNode rel, RelMetadataQuery mq, MappingQueryContext ctx) {
+ assert mq instanceof RelMetadataQueryEx;
+
+ return ((RelMetadataQueryEx) mq).fragmentMapping(rel, ctx);
+ }
+
/** {@inheritDoc} */
@Override
public MetadataDef<FragmentMappingMetadata> getDef() {
@@ -68,38 +81,39 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
* @param mq Metadata query instance. Used to request appropriate metadata from node children.
* @return Nodes mapping, representing a list of nodes capable to execute a query over particular partitions.
*/
- public FragmentMapping fragmentMapping(RelNode rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(RelNode rel, RelMetadataQuery mq, MappingQueryContext ctx) {
throw new AssertionError();
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
- public FragmentMapping fragmentMapping(RelSubset rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(RelSubset rel, RelMetadataQuery mq, MappingQueryContext ctx) {
throw new AssertionError();
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
- public FragmentMapping fragmentMapping(SingleRel rel, RelMetadataQuery mq) {
- return fragmentMappingForMetadataQuery(rel.getInput(), mq);
+ public FragmentMapping fragmentMapping(SingleRel rel, RelMetadataQuery mq, MappingQueryContext ctx) {
+ return fragmentMappingForMetadataQuery(rel.getInput(), mq, ctx);
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*
- * <p>{@link ColocationMappingException} may be thrown on two children nodes locations merge. This means that the fragment (which part
- * the parent node is) cannot be executed on any node and additional exchange is needed. This case we throw
- * {@link NodeMappingException} with an edge, where we need the additional exchange. After the exchange is put into the fragment and
- * the fragment is split into two ones, fragment meta information will be recalculated for all fragments.
+ * <p>{@link ColocationMappingException} may be thrown on two children nodes locations merge. This means that the fragment
+ * (which part the parent node is) cannot be executed on any node and additional exchange is needed.
+ * This case we throw {@link NodeMappingException} with an edge, where we need the additional exchange.
+ * After the exchange is put into the fragment and the fragment is split into two ones,
+ * fragment meta information will be recalculated for all fragments.
*/
- public FragmentMapping fragmentMapping(BiRel rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(BiRel rel, RelMetadataQuery mq, MappingQueryContext ctx) {
RelNode left = rel.getLeft();
RelNode right = rel.getRight();
- FragmentMapping frgLeft = fragmentMappingForMetadataQuery(left, mq);
- FragmentMapping frgRight = fragmentMappingForMetadataQuery(right, mq);
+ FragmentMapping frgLeft = fragmentMappingForMetadataQuery(left, mq, ctx);
+ FragmentMapping frgRight = fragmentMappingForMetadataQuery(right, mq, ctx);
try {
return frgLeft.colocate(frgRight);
@@ -122,7 +136,7 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}
*
* <p>{@link ColocationMappingException} may be thrown on two children nodes locations merge. This means that the
* fragment (which part the parent node is) cannot be executed on any node and additional exchange is needed. This
@@ -130,18 +144,19 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
* exchange is put into the fragment and the fragment is split into two ones, fragment meta information will be
* recalculated for all fragments.
*/
- public FragmentMapping fragmentMapping(SetOp rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(SetOp rel, RelMetadataQuery mq, MappingQueryContext ctx) {
FragmentMapping res = null;
if (TraitUtils.distribution(rel) == IgniteDistributions.random()) {
for (RelNode input : rel.getInputs()) {
- res = res == null ? fragmentMappingForMetadataQuery(input, mq) : res.combine(fragmentMappingForMetadataQuery(input, mq));
+ res = res == null ? fragmentMappingForMetadataQuery(input, mq, ctx) : res.combine(
+ fragmentMappingForMetadataQuery(input, mq, ctx));
}
} else {
for (RelNode input : rel.getInputs()) {
try {
- res = res == null
- ? fragmentMappingForMetadataQuery(input, mq) : res.colocate(fragmentMappingForMetadataQuery(input, mq));
+ res = res == null ? fragmentMappingForMetadataQuery(input, mq, ctx) : res.colocate(
+ fragmentMappingForMetadataQuery(input, mq, ctx));
} catch (ColocationMappingException e) {
throw new NodeMappingException("Failed to calculate physical distribution", input, e);
}
@@ -152,75 +167,62 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*
* <p>Prunes involved partitions (hence nodes, involved in query execution) if possible.
*/
- public FragmentMapping fragmentMapping(IgniteFilter rel, RelMetadataQuery mq) {
- return fragmentMappingForMetadataQuery(rel.getInput(), mq).prune(rel);
+ public FragmentMapping fragmentMapping(IgniteFilter rel, RelMetadataQuery mq, MappingQueryContext ctx) {
+ return fragmentMappingForMetadataQuery(rel.getInput(), mq, ctx).prune(rel);
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*
* <p>Prunes involved partitions (hence nodes, involved in query execution) if possible.
*/
- public FragmentMapping fragmentMapping(IgniteTrimExchange rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(IgniteTrimExchange rel, RelMetadataQuery mq, MappingQueryContext ctx) {
try {
return FragmentMapping.create(rel.sourceId())
- .colocate(fragmentMappingForMetadataQuery(rel.getInput(), mq));
+ .colocate(fragmentMappingForMetadataQuery(rel.getInput(), mq, ctx));
} catch (ColocationMappingException e) {
throw new AssertionError(e);
}
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
- public FragmentMapping fragmentMapping(IgniteReceiver rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(IgniteReceiver rel, RelMetadataQuery mq, MappingQueryContext ctx) {
return FragmentMapping.create(rel.exchangeId());
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
- public FragmentMapping fragmentMapping(IgniteIndexScan rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(IgniteIndexScan rel, RelMetadataQuery mq, MappingQueryContext ctx) {
return FragmentMapping.create(rel.sourceId(),
- rel.getTable().unwrap(InternalIgniteTable.class).colocationGroup(Commons.context(rel)));
+ rel.getTable().unwrap(InternalIgniteTable.class).colocationGroup(ctx));
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
- public FragmentMapping fragmentMapping(IgniteTableScan rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(IgniteTableScan rel, RelMetadataQuery mq, MappingQueryContext ctx) {
return FragmentMapping.create(rel.sourceId(),
- rel.getTable().unwrap(InternalIgniteTable.class).colocationGroup(Commons.context(rel)));
+ rel.getTable().unwrap(InternalIgniteTable.class).colocationGroup(ctx));
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
- public FragmentMapping fragmentMapping(IgniteValues rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(IgniteValues rel, RelMetadataQuery mq, MappingQueryContext ctx) {
return FragmentMapping.create();
}
/**
- * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}.
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
*/
- public FragmentMapping fragmentMapping(IgniteTableFunctionScan rel, RelMetadataQuery mq) {
+ public FragmentMapping fragmentMapping(IgniteTableFunctionScan rel, RelMetadataQuery mq, MappingQueryContext ctx) {
return FragmentMapping.create();
}
-
- /**
- * Fragment info calculation entry point.
- *
- * @param rel Root node of a calculated fragment.
- * @param mq Metadata query instance.
- * @return Fragment meta information.
- */
- public static FragmentMapping fragmentMappingForMetadataQuery(RelNode rel, RelMetadataQuery mq) {
- assert mq instanceof RelMetadataQueryEx;
-
- return ((RelMetadataQueryEx) mq).fragmentMapping(rel);
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 35480f9..35dd55a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.metadata.MetadataDef;
import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
/**
@@ -53,19 +54,18 @@ public class IgniteMetadata {
DefaultRelMetadataProvider.INSTANCE));
/**
- * FragmentMappingMetadata interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Describes the method signature for the methods of the {@link IgniteMdFragmentMapping}.
*/
public interface FragmentMappingMetadata extends Metadata {
MetadataDef<FragmentMappingMetadata> DEF = MetadataDef.of(FragmentMappingMetadata.class,
FragmentMappingMetadata.Handler.class, IgniteMethod.FRAGMENT_MAPPING.method());
/** Determines how the rows are distributed. */
- FragmentMapping fragmentMapping();
+ FragmentMapping fragmentMapping(MappingQueryContext ctx);
/** Handler API. */
interface Handler extends MetadataHandler<FragmentMappingMetadata> {
- FragmentMapping fragmentMapping(RelNode r, RelMetadataQuery mq);
+ FragmentMapping fragmentMapping(RelNode r, RelMetadataQuery mq, MappingQueryContext ctx);
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 2f3b883..b28462d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.reflections.Reflections;
import org.reflections.scanners.SubTypesScanner;
@@ -88,10 +89,10 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
* @param rel Relational node.
* @return Fragment meta information.
*/
- public FragmentMapping fragmentMapping(RelNode rel) {
- for (; ; ) {
+ public FragmentMapping fragmentMapping(RelNode rel, MappingQueryContext ctx) {
+ for (;;) {
try {
- return sourceDistributionHandler.fragmentMapping(rel, this);
+ return sourceDistributionHandler.fragmentMapping(rel, this, ctx);
} catch (JaninoRelMetadataProvider.NoHandler e) {
sourceDistributionHandler = revise(e.relClass, IgniteMetadata.FragmentMappingMetadata.DEF);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 8dac7af..ed29bbe 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -102,7 +103,7 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
/** {@inheritDoc} */
@Override
- public void init(PlanningContext ctx) {
- executionPlan = queryTemplate.map(ctx);
+ public void init(MappingService mappingService, MappingQueryContext ctx) {
+ executionPlan = queryTemplate.map(mappingService, ctx);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index f1e77c8..1d74e8a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -22,7 +22,6 @@ import static org.apache.ignite.internal.processors.query.calcite.externalize.Re
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
@@ -35,6 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.NotNull;
@@ -106,9 +106,9 @@ public class Fragment {
return mapping;
}
- private FragmentMapping mapping(PlanningContext ctx, RelMetadataQuery mq, Supplier<List<String>> nodesSource) {
+ private FragmentMapping mapping(MappingQueryContext ctx, RelMetadataQuery mq, Supplier<List<String>> nodesSource) {
try {
- FragmentMapping mapping = IgniteMdFragmentMapping.fragmentMappingForMetadataQuery(root, mq);
+ FragmentMapping mapping = IgniteMdFragmentMapping.fragmentMappingForMetadataQuery(root, mq, ctx);
if (rootFragment()) {
mapping = FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
@@ -141,24 +141,8 @@ public class Fragment {
return !(root instanceof IgniteSender);
}
- /**
- * Attach.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
- public Fragment attach(PlanningContext ctx) {
- RelOptCluster cluster = ctx.cluster();
-
- return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
- }
-
- /**
- * Detach.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
- public Fragment detach() {
- RelOptCluster cluster = PlanningContext.empty().cluster();
-
- return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
+ public Fragment copy() {
+ return new Cloner(Commons.cluster()).go(this);
}
/**
@@ -167,9 +151,7 @@ public class Fragment {
* @param ctx Planner context.
* @param mq Metadata query.
*/
- Fragment map(MappingService mappingSrvc, PlanningContext ctx, RelMetadataQuery mq) throws FragmentMappingException {
- assert root.getCluster() == ctx.cluster() : "Fragment is detached [fragment=" + this + "]";
-
+ Fragment map(MappingService mappingSrvc, MappingQueryContext ctx, RelMetadataQuery mq) throws FragmentMappingException {
if (mapping != null) {
return this;
}
@@ -178,7 +160,7 @@ public class Fragment {
}
@NotNull
- private Supplier<List<String>> nodesSource(MappingService mappingSrvc, PlanningContext ctx) {
+ private Supplier<List<String>> nodesSource(MappingService mappingSrvc, MappingQueryContext ctx) {
return () -> mappingSrvc.executionNodes(ctx.topologyVersion(), single(), null);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
index abc3c08..f1e5524 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* FragmentPlan.
@@ -32,7 +33,7 @@ public class FragmentPlan implements QueryPlan {
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public FragmentPlan(IgniteRel root) {
- RelOptCluster cluster = PlanningContext.empty().cluster();
+ RelOptCluster cluster = Commons.cluster();
this.root = new Cloner(cluster).visit(root);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
similarity index 58%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 2864fe9..90f240b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -15,17 +15,32 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.schema;
-
-import org.apache.calcite.schema.SchemaPlus;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
/**
- * SchemaHolder interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Query mapping context.
*/
-public interface SchemaHolder {
+public class MappingQueryContext {
+ private final String locNodeId;
+
+ private final long topVer;
+
/**
- * Get schema.
+ * Constructor.
+ *
+ * @param locNodeId Local node identifier.
+ * @param topVer Topology version to map.
*/
- SchemaPlus schema();
+ public MappingQueryContext(String locNodeId, long topVer) {
+ this.locNodeId = locNodeId;
+ this.topVer = topVer;
+ }
+
+ public String localNodeId() {
+ return locNodeId;
+ }
+
+ public long topologyVersion() {
+ return topVer;
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
index e60b141..d8bd80d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -21,6 +21,7 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.List;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
/**
* Regular query or DML.
@@ -54,5 +55,5 @@ public interface MultiStepPlan extends QueryPlan {
*
* @param ctx Planner context.
*/
- void init(PlanningContext ctx);
+ void init(MappingService mappingService, MappingQueryContext ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index 3f08ad2..8e6be74 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -17,104 +17,52 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import static org.apache.calcite.tools.Frameworks.createRootSchema;
-import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
-
-import java.util.Properties;
import java.util.function.Function;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.config.CalciteConnectionConfigImpl;
-import org.apache.calcite.config.CalciteConnectionProperty;
-import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RuleSet;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.jetbrains.annotations.NotNull;
/**
* Planning context.
*/
public final class PlanningContext implements Context {
- private static final PlanningContext EMPTY = builder().build();
-
- private final FrameworkConfig cfg;
-
private final Context parentCtx;
- private final String locNodeId;
-
- private final String originatingNodeId;
-
private final String qry;
private final Object[] parameters;
- private final long topVer;
-
- private final IgniteTypeFactory typeFactory;
-
private Function<RuleSet, RuleSet> rulesFilter;
private IgnitePlanner planner;
- private CalciteConnectionConfig connCfg;
-
- private CalciteCatalogReader catalogReader;
-
/**
* Private constructor, used by a builder.
*/
private PlanningContext(
- FrameworkConfig cfg,
Context parentCtx,
- String locNodeId,
- String originatingNodeId,
String qry,
- Object[] parameters,
- long topVer
+ Object[] parameters
) {
- this.locNodeId = locNodeId;
- this.originatingNodeId = originatingNodeId;
this.qry = qry;
this.parameters = parameters;
- this.topVer = topVer;
-
- this.parentCtx = Contexts.chain(parentCtx, cfg.getContext());
- // link frameworkConfig#context() to this.
- this.cfg = Frameworks.newConfigBuilder(cfg).context(this).build();
-
- RelDataTypeSystem typeSys = connectionConfig().typeSystem(RelDataTypeSystem.class, cfg.getTypeSystem());
- typeFactory = new IgniteTypeFactory(typeSys);
- }
-
- /**
- * Get local node ID.
- */
- public String localNodeId() {
- return locNodeId;
- }
-
- /**
- * Get originating node ID (the node, who started the execution).
- */
- public String originatingNodeId() {
- return originatingNodeId == null ? locNodeId : originatingNodeId;
+ this.parentCtx = parentCtx;
}
/**
* Get framework config.
*/
public FrameworkConfig config() {
- return cfg;
+ return unwrap(BaseQueryContext.class).config();
}
/**
@@ -132,13 +80,6 @@ public final class PlanningContext implements Context {
return parameters;
}
- /**
- * Get topology version.
- */
- public long topologyVersion() {
- return topVer;
- }
-
// Helper methods
/**
@@ -152,7 +93,7 @@ public final class PlanningContext implements Context {
* Get sql conformance.
*/
public SqlConformance conformance() {
- return cfg.getParserConfig().conformance();
+ return config().getParserConfig().conformance();
}
/**
@@ -177,61 +118,21 @@ public final class PlanningContext implements Context {
* Get schema.
*/
public SchemaPlus schema() {
- return cfg.getDefaultSchema();
+ return config().getDefaultSchema();
}
/**
* Get type factory.
*/
public IgniteTypeFactory typeFactory() {
- return typeFactory;
- }
-
- /**
- * Get connection config. Defines connected user parameters like TimeZone or Locale.
- */
- public CalciteConnectionConfig connectionConfig() {
- if (connCfg != null) {
- return connCfg;
- }
-
- CalciteConnectionConfig connCfg = unwrap(CalciteConnectionConfig.class);
-
- if (connCfg != null) {
- return this.connCfg = connCfg;
- }
-
- Properties props = new Properties();
-
- props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
- String.valueOf(cfg.getParserConfig().caseSensitive()));
- props.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(),
- String.valueOf(cfg.getParserConfig().conformance()));
- props.setProperty(CalciteConnectionProperty.MATERIALIZATIONS_ENABLED.camelName(),
- String.valueOf(true));
-
- return this.connCfg = new CalciteConnectionConfigImpl(props);
+ return unwrap(BaseQueryContext.class).typeFactory();
}
/**
* Get new catalog reader.
*/
public CalciteCatalogReader catalogReader() {
- if (catalogReader != null) {
- return catalogReader;
- }
-
- SchemaPlus dfltSchema = schema();
- SchemaPlus rootSchema = dfltSchema;
-
- while (rootSchema.getParentSchema() != null) {
- rootSchema = rootSchema.getParentSchema();
- }
-
- return catalogReader = new CalciteCatalogReader(
- CalciteSchema.from(rootSchema),
- CalciteSchema.from(dfltSchema).path(null),
- typeFactory(), connectionConfig());
+ return unwrap(BaseQueryContext.class).catalogReader();
}
/**
@@ -248,10 +149,6 @@ public final class PlanningContext implements Context {
return clazz.cast(this);
}
- if (clazz.isInstance(connCfg)) {
- return clazz.cast(connCfg);
- }
-
return parentCtx.unwrap(clazz);
}
@@ -262,13 +159,6 @@ public final class PlanningContext implements Context {
return new Builder();
}
- /**
- * Get empty context.
- */
- public static PlanningContext empty() {
- return EMPTY;
- }
-
public RuleSet rules(RuleSet set) {
return rulesFilter != null ? rulesFilter.apply(set) : set;
}
@@ -283,113 +173,35 @@ public final class PlanningContext implements Context {
/**
* Planner context builder.
*/
- @SuppressWarnings("PublicInnerClass")
public static class Builder {
- private static final FrameworkConfig EMPTY_CONFIG =
- Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(createRootSchema(false))
- .traitDefs()
- .build();
-
- private FrameworkConfig frameworkCfg = EMPTY_CONFIG;
-
private Context parentCtx = Contexts.empty();
- private String locNodeId;
-
- private String originatingNodeId;
-
private String qry;
private Object[] parameters;
- private long topVer;
-
- /**
- * Set local node id.
- *
- * @param locNodeId Local node ID.
- * @return Builder for chaining.
- */
- public Builder localNodeId(@NotNull String locNodeId) {
- this.locNodeId = locNodeId;
- return this;
- }
-
- /**
- * Set originating node id.
- *
- * @param originatingNodeId Originating node ID (the node, who started the execution).
- * @return Builder for chaining.
- */
- public Builder originatingNodeId(@NotNull String originatingNodeId) {
- this.originatingNodeId = originatingNodeId;
- return this;
- }
-
- /**
- * Set framework config.
- *
- * @param frameworkCfg Framework config.
- * @return Builder for chaining.
- */
- public Builder frameworkConfig(@NotNull FrameworkConfig frameworkCfg) {
- this.frameworkCfg = frameworkCfg;
- return this;
- }
-
- /**
- * Set parent context.
- *
- * @param parentCtx Parent context.
- * @return Builder for chaining.
- */
public Builder parentContext(@NotNull Context parentCtx) {
this.parentCtx = parentCtx;
return this;
}
- /**
- * Set query.
- *
- * @param qry Query.
- * @return Builder for chaining.
- */
public Builder query(@NotNull String qry) {
this.qry = qry;
return this;
}
- /**
- * Set query parameters.
- *
- * @param parameters Query parameters.
- * @return Builder for chaining.
- */
- @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public Builder parameters(@NotNull Object... parameters) {
this.parameters = parameters;
return this;
}
/**
- * Set topology version.
- *
- * @param topVer Topology version.
- * @return Builder for chaining.
- */
- public Builder topologyVersion(long topVer) {
- this.topVer = topVer;
- return this;
- }
-
- /**
* Builds planner context.
*
* @return Planner context.
*/
public PlanningContext build() {
- return new PlanningContext(frameworkCfg, parentCtx, locNodeId, originatingNodeId, qry, parameters, topVer);
+ return new PlanningContext(parentCtx, qry, parameters);
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
index eddd614..03f60db 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.util.function.Supplier;
import org.apache.ignite.internal.processors.query.calcite.exec.LifecycleAware;
/**
@@ -25,15 +26,13 @@ import org.apache.ignite.internal.processors.query.calcite.exec.LifecycleAware;
*/
public interface QueryPlanCache extends LifecycleAware {
/**
- * Get query plan from cache if exists,
- * otherwise prepare plan, put to cache and return the prepared plan.
+ * Get query plan from cache if exists, otherwise prepare plan, put to cache and return the prepared plan.
*
- * @param ctx Context.
- * @param key Cache key.
- * @param factory Factory method to generate a plan on cache miss.
+ * @param key Cache key.
+ * @param planSupplier Factory method to generate a plan on cache miss.
* @return Query plan.
*/
- QueryPlan queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory);
+ QueryPlan queryPlan(CacheKey key, Supplier<QueryPlan> planSupplier);
/**
* Get query plan from cache if exists, otherwise returns {@code null}.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 22d4a9f..effd52b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
/**
* Implementation of {@link QueryPlanCache} that simply wraps a {@link Caffeine} cache.
@@ -41,9 +42,9 @@ public class QueryPlanCacheImpl implements QueryPlanCache {
/** {@inheritDoc} */
@Override
- public QueryPlan queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory) {
+ public QueryPlan queryPlan(CacheKey key, Supplier<QueryPlan> planSupplier) {
Map<CacheKey, QueryPlan> cache = this.cache;
- QueryPlan plan = cache.computeIfAbsent(key, k -> factory.create(ctx));
+ QueryPlan plan = cache.computeIfAbsent(key, k -> planSupplier.get());
return plan.copy();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
index 1d4ff7e..d2cc949 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -39,8 +39,6 @@ import org.jetbrains.annotations.NotNull;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class QueryTemplate {
- private final MappingService mappingService;
-
private final List<Fragment> fragments;
private final AtomicReference<ExecutionPlan> executionPlan = new AtomicReference<>();
@@ -49,12 +47,11 @@ public class QueryTemplate {
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public QueryTemplate(MappingService mappingService, List<Fragment> fragments) {
- this.mappingService = mappingService;
+ public QueryTemplate(List<Fragment> fragments) {
List<Fragment> frgs = new ArrayList<>(fragments.size());
for (Fragment fragment : fragments) {
- frgs.add(fragment.detach());
+ frgs.add(fragment.copy());
}
this.fragments = List.copyOf(frgs);
@@ -64,19 +61,19 @@ public class QueryTemplate {
* Map.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public ExecutionPlan map(PlanningContext ctx) {
+ public ExecutionPlan map(MappingService mappingService, MappingQueryContext ctx) {
ExecutionPlan executionPlan = this.executionPlan.get();
if (executionPlan != null && Objects.equals(executionPlan.topologyVersion(), ctx.topologyVersion())) {
return executionPlan;
}
- List<Fragment> fragments = Commons.transform(this.fragments, f -> f.attach(ctx));
+ List<Fragment> fragments = Commons.transform(this.fragments, Fragment::copy);
Exception ex = null;
RelMetadataQuery mq = first(fragments).root().getCluster().getMetadataQuery();
for (int i = 0; i < 3; i++) {
try {
- ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(fragments, ctx, mq));
+ ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(mappingService, fragments, ctx, mq));
if (executionPlan == null || executionPlan.topologyVersion() < executionPlan0.topologyVersion()) {
this.executionPlan.compareAndSet(executionPlan, executionPlan0);
@@ -98,11 +95,11 @@ public class QueryTemplate {
}
@NotNull
- private List<Fragment> map(List<Fragment> fragments, PlanningContext ctx, RelMetadataQuery mq) {
+ private List<Fragment> map(MappingService mappingService, List<Fragment> fragments, MappingQueryContext ctx, RelMetadataQuery mq) {
List<Fragment> frgs = new ArrayList<>();
for (Fragment fragment : fragments) {
- frgs.add(fragment.map(mappingService, ctx, mq).detach());
+ frgs.add(fragment.map(mappingService, ctx, mq));
}
return List.copyOf(frgs);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
index 63cb447..fc0fd21 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTableImpl.java
@@ -41,7 +41,7 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -141,7 +141,7 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
/** {@inheritDoc} */
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return partitionedGroup();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
index 5144278..2602ddd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/InternalIgniteTable.java
@@ -27,7 +27,7 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.table.TableImpl;
@@ -119,7 +119,7 @@ public interface InternalIgniteTable extends IgniteTable {
* @param ctx Planning context.
* @return Nodes mapping.
*/
- ColocationGroup colocationGroup(PlanningContext ctx);
+ ColocationGroup colocationGroup(MappingQueryContext ctx);
/**
* Returns all table indexes.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
index 2864fe9..b5c0444 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
import org.apache.calcite.schema.SchemaPlus;
+import org.jetbrains.annotations.Nullable;
/**
* SchemaHolder interface.
@@ -25,7 +26,7 @@ import org.apache.calcite.schema.SchemaPlus;
*/
public interface SchemaHolder {
/**
- * Get schema.
+ * Return specified schema if the schema name is specified or default schema when {@code schema} is {@code null}.
*/
- SchemaPlus schema();
+ SchemaPlus schema(@Nullable String schema);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index f65c628..58bd9ef 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -27,6 +27,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.TableImpl;
+import org.jetbrains.annotations.Nullable;
/**
* Holds actual schema and mutates it on schema change, requested by Ignite.
@@ -52,8 +53,8 @@ public class SchemaHolderImpl implements SchemaHolder {
/** {@inheritDoc} */
@Override
- public SchemaPlus schema() {
- return calciteSchema;
+ public SchemaPlus schema(@Nullable String schema) {
+ return schema != null ? calciteSchema.getSubSchema(schema) : calciteSchema;
}
public synchronized void onSchemaCreated(String schemaName) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/AbstractQueryContext.java
similarity index 61%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/AbstractQueryContext.java
index 2864fe9..efb1eb9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/AbstractQueryContext.java
@@ -15,17 +15,26 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query.calcite.util;
-import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.plan.Context;
/**
- * SchemaHolder interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Abstract query context.
*/
-public interface SchemaHolder {
- /**
- * Get schema.
- */
- SchemaPlus schema();
+public class AbstractQueryContext implements Context {
+ private final Context parentCtx;
+
+ public AbstractQueryContext(Context parentCtx) {
+ this.parentCtx = parentCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <C> C unwrap(Class<C> cls) {
+ if (cls == getClass()) {
+ return cls.cast(this);
+ }
+
+ return parentCtx.unwrap(cls);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
new file mode 100644
index 0000000..4605162
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
+
+import java.util.Properties;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.processors.query.calcite.QueryCancel;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.logger.NullLogger;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Base query context.
+ */
+public final class BaseQueryContext extends AbstractQueryContext {
+ public static final CalciteConnectionConfig CALCITE_CONNECTION_CONFIG;
+
+ public static final RelOptCluster CLUSTER;
+
+ public static final IgniteTypeFactory TYPE_FACTORY;
+
+ private static final IgniteCostFactory COST_FACTORY = new IgniteCostFactory();
+
+ private static final BaseQueryContext EMPTY_CONTEXT;
+
+ private static final VolcanoPlanner EMPTY_PLANNER;
+
+ private static final RexBuilder DFLT_REX_BUILDER;
+
+ static {
+ Properties props = new Properties();
+
+ props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
+ String.valueOf(FRAMEWORK_CONFIG.getParserConfig().caseSensitive()));
+ props.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(),
+ String.valueOf(FRAMEWORK_CONFIG.getParserConfig().conformance()));
+ props.setProperty(CalciteConnectionProperty.MATERIALIZATIONS_ENABLED.camelName(),
+ String.valueOf(true));
+
+ CALCITE_CONNECTION_CONFIG = new CalciteConnectionConfigImpl(props);
+
+ EMPTY_CONTEXT = builder().build();
+
+ EMPTY_PLANNER = new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT);
+
+ RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, FRAMEWORK_CONFIG.getTypeSystem());
+ TYPE_FACTORY = new IgniteTypeFactory(typeSys);
+
+ DFLT_REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ CLUSTER = RelOptCluster.create(EMPTY_PLANNER, DFLT_REX_BUILDER);
+
+ CLUSTER.setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, EMPTY_PLANNER));
+ CLUSTER.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+ }
+
+ private final FrameworkConfig cfg;
+
+ private final IgniteLogger log;
+
+ private final IgniteTypeFactory typeFactory;
+
+ private final RexBuilder rexBuilder;
+
+ private final QueryCancel qryCancel;
+
+ private CalciteCatalogReader catalogReader;
+
+ /**
+ * Private constructor, used by a builder.
+ */
+ private BaseQueryContext(
+ FrameworkConfig cfg,
+ Context parentCtx,
+ IgniteLogger log
+ ) {
+ super(Contexts.chain(parentCtx, cfg.getContext()));
+
+ // link frameworkConfig#context() to this.
+ this.cfg = Frameworks.newConfigBuilder(cfg).context(this).build();
+
+ this.log = log;
+
+ RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, cfg.getTypeSystem());
+
+ typeFactory = new IgniteTypeFactory(typeSys);
+
+ qryCancel = unwrap(QueryCancel.class);
+
+ rexBuilder = new RexBuilder(typeFactory);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static BaseQueryContext empty() {
+ return EMPTY_CONTEXT;
+ }
+
+ public FrameworkConfig config() {
+ return cfg;
+ }
+
+ public IgniteLogger logger() {
+ return log;
+ }
+
+ public String schemaName() {
+ return schema().getName();
+ }
+
+ public SchemaPlus schema() {
+ return cfg.getDefaultSchema();
+ }
+
+ public IgniteTypeFactory typeFactory() {
+ return typeFactory;
+ }
+
+ public RexBuilder rexBuilder() {
+ return rexBuilder;
+ }
+
+ /**
+ * Returns calcite catalog reader.
+ */
+ public CalciteCatalogReader catalogReader() {
+ if (catalogReader != null) {
+ return catalogReader;
+ }
+
+ SchemaPlus dfltSchema = schema();
+ SchemaPlus rootSchema = dfltSchema;
+
+ while (rootSchema.getParentSchema() != null) {
+ rootSchema = rootSchema.getParentSchema();
+ }
+
+ return catalogReader = new CalciteCatalogReader(
+ CalciteSchema.from(rootSchema),
+ CalciteSchema.from(dfltSchema).path(null),
+ typeFactory(), CALCITE_CONNECTION_CONFIG);
+ }
+
+ public QueryCancel queryCancel() {
+ return qryCancel;
+ }
+
+ /**
+ * Query context builder.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ public static class Builder {
+ private static final FrameworkConfig EMPTY_CONFIG =
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(createRootSchema(false))
+ .build();
+
+ private FrameworkConfig frameworkCfg = EMPTY_CONFIG;
+
+ private Context parentCtx = Contexts.empty();
+
+ private IgniteLogger log = new NullLogger();
+
+ public Builder frameworkConfig(@NotNull FrameworkConfig frameworkCfg) {
+ this.frameworkCfg = frameworkCfg;
+ return this;
+ }
+
+ public Builder parentContext(@NotNull Context parentCtx) {
+ this.parentCtx = parentCtx;
+ return this;
+ }
+
+ public Builder logger(@NotNull IgniteLogger log) {
+ this.log = log;
+ return this;
+ }
+
+ public BaseQueryContext build() {
+ return new BaseQueryContext(frameworkCfg, parentCtx, log);
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 106c1f9..2a9110d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.util;
+import static org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext.CLUSTER;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import java.io.Reader;
@@ -85,6 +86,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorI
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractMultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
@@ -318,17 +320,24 @@ public final class Commons {
}
/**
- * Extracts planner context.
+ * Standalone type factory.
+ */
+ public static IgniteTypeFactory typeFactory() {
+ return typeFactory(cluster());
+ }
+
+ /**
+ * Extracts query context.
*/
- public static PlanningContext context(RelNode rel) {
+ public static BaseQueryContext context(RelNode rel) {
return context(rel.getCluster());
}
/**
- * Extracts planner context.
+ * Extracts query context.
*/
- public static PlanningContext context(RelOptCluster cluster) {
- return context(cluster.getPlanner().getContext());
+ public static BaseQueryContext context(RelOptCluster cluster) {
+ return Objects.requireNonNull(cluster.getPlanner().getContext().unwrap(BaseQueryContext.class));
}
/**
@@ -812,4 +821,12 @@ public final class Commons {
return parser.parseStmtList();
}
+
+ public static RelOptCluster cluster() {
+ return CLUSTER;
+ }
+
+ public static MappingQueryContext mapContext(String locNodeId, long topVer) {
+ return new MappingQueryContext(locNodeId, topVer);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index d4d888f..3203042 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.BiScalar;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteSqlFunctions;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.SingleScalar;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMappingMetadata;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
/**
* Contains methods used in metadata definitions.
@@ -56,8 +57,8 @@ public enum IgniteMethod {
SYSTEM_RANGE3(IgniteSqlFunctions.class, "systemRange", Object.class, Object.class, Object.class),
- /** See {@link FragmentMappingMetadata#fragmentMapping()}. */
- FRAGMENT_MAPPING(FragmentMappingMetadata.class, "fragmentMapping");
+ /** See {@link FragmentMappingMetadata#fragmentMapping(MappingQueryContext)}. */
+ FRAGMENT_MAPPING(FragmentMappingMetadata.class, "fragmentMapping", MappingQueryContext.class);
private final Method method;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java
index bc0a7ed..02a81a6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.query.calcite.exec;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import com.google.common.collect.ImmutableMap;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Date;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelCollations;
@@ -32,8 +34,8 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.Cursor;
@@ -104,13 +106,18 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest {
private RuntimeSortedIndex<Object[]> generate(RelDataType rowType, final List<Integer> idxCols, int notUnique) {
RuntimeSortedIndex<Object[]> idx = new RuntimeSortedIndex<>(
new ExecutionContext<>(
- null,
- PlanningContext.builder()
+ BaseQueryContext.builder()
+ .logger(log)
.build(),
null,
+ UUID.randomUUID(),
+ "fake-test-node",
+ "fake-test-node",
+ 0,
null,
ArrayRowHandler.INSTANCE,
- null),
+ ImmutableMap.of()
+ ),
RelCollations.of(ImmutableIntList.copyOf(idxCols)),
(o1, o2) -> {
for (int colIdx : idxCols) {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index b42a65c..61671df 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+import com.google.common.collect.ImmutableMap;
import java.util.Iterator;
-import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.jetbrains.annotations.NotNull;
@@ -67,14 +67,17 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
protected ExecutionContext<Object[]> executionContext() {
FragmentDescription fragmentDesc = new FragmentDescription(0, null, null, null);
return new ExecutionContext<>(
- taskExecutor,
- PlanningContext.builder()
- .localNodeId(UUID.randomUUID().toString())
+ BaseQueryContext.builder()
+ .logger(log)
.build(),
+ taskExecutor,
UUID.randomUUID(),
+ "fake-test-node",
+ "fake-test-node",
+ 0,
fragmentDesc,
ArrayRowHandler.INSTANCE,
- Map.of()
+ ImmutableMap.of()
);
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index e1fe1b3..e9f2f27 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -41,7 +41,6 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
@@ -77,6 +76,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.query.calcite.schema.TableDescripto
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -113,7 +114,7 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
private String lastErrorMsg;
interface TestVisitor {
- public void visit(RelNode node, int ordinal, RelNode parent);
+ void visit(RelNode node, int ordinal, RelNode parent);
}
/**
@@ -205,10 +206,15 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
.add("PUBLIC", publicSchema);
PlanningContext ctx = PlanningContext.builder()
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .build())
+ .parentContext(BaseQueryContext.builder()
+ .frameworkConfig(
+ newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build()
+ )
+ .logger(log)
+ .build()
+ )
.query(sql)
.build();
@@ -480,22 +486,21 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
nodes.add(UUID.randomUUID().toString());
}
- PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(nodes))
- .originatingNodeId(first(nodes))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .build())
+ BaseQueryContext ctx = BaseQueryContext.builder()
+ .frameworkConfig(
+ newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build()
+ )
+ .logger(log)
.build();
+
List<RelNode> deserializedNodes = new ArrayList<>();
- try (IgnitePlanner ignored = ctx.planner()) {
- for (String s : serialized) {
- RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
- deserializedNodes.add(reader.read(s));
- }
+ for (String s : serialized) {
+ RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
+ deserializedNodes.add(reader.read(s));
}
List<RelNode> expectedRels = fragments.stream()
@@ -662,7 +667,7 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
/** {@inheritDoc} */
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
throw new AssertionError();
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 2a07243..a761c42 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite.planner;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
-import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -30,7 +29,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
-import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollations;
@@ -44,6 +42,7 @@ import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
@@ -60,6 +59,9 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeAll;
@@ -101,7 +103,7 @@ public class PlannerTest extends AbstractPlannerTest {
}
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
select(NODES, 0, 1),
select(NODES, 1, 2),
@@ -122,7 +124,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("NAME", f.createJavaType(String.class))
.add("VER", f.createJavaType(Integer.class))
.build()) {
- @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
select(NODES, 0, 1),
select(NODES, 1, 2),
@@ -151,11 +153,11 @@ public class PlannerTest extends AbstractPlannerTest {
+ "ON d.id = p.id0";
PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
+ .parentContext(BaseQueryContext.builder()
+ .logger(log)
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build())
.build())
.query(sql)
.parameters(2)
@@ -167,12 +169,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(ctx);
+ plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -190,7 +191,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("PROJECTID", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(select(NODES, 0, 1, 2, 3));
}
@@ -207,7 +208,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("VER", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(select(NODES, 0, 1, 2, 3));
}
@@ -232,11 +233,11 @@ public class PlannerTest extends AbstractPlannerTest {
+ "WHERE (d.projectId + 1) > ?";
PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
+ .parentContext(BaseQueryContext.builder()
+ .logger(log)
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build())
.build())
.query(sql)
.parameters(2)
@@ -246,12 +247,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(ctx);
+ plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -269,7 +269,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("PROJECTID", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(select(NODES, 0));
}
@@ -286,7 +286,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("VER", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
select(NODES, 1, 2),
select(NODES, 2, 3),
@@ -316,11 +316,11 @@ public class PlannerTest extends AbstractPlannerTest {
+ "WHERE (d.projectId + 1) > ?";
PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
+ .parentContext(BaseQueryContext.builder()
+ .logger(log)
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build())
.build())
.query(sql)
.parameters(2)
@@ -330,12 +330,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(ctx);
+ plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
assertEquals(3, plan.fragments().size());
}
@@ -351,7 +350,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("PROJECTID", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(select(NODES, 1, 2, 3));
}
@@ -368,7 +367,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("VER", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
select(NODES, 0),
select(NODES, 1),
@@ -398,11 +397,11 @@ public class PlannerTest extends AbstractPlannerTest {
+ "WHERE (d.projectId + 1) > ?";
PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
+ .parentContext(BaseQueryContext.builder()
+ .logger(log)
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build())
.build())
.query(sql)
.parameters(2)
@@ -412,12 +411,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(ctx);
+ plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -435,7 +433,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("PROJECTID", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(select(NODES, 0));
}
@@ -451,7 +449,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("VER", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
select(NODES, 1),
select(NODES, 2),
@@ -481,11 +479,11 @@ public class PlannerTest extends AbstractPlannerTest {
+ "WHERE (d.projectId + 1) > ?";
PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
+ .parentContext(BaseQueryContext.builder()
+ .logger(log)
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build())
.build())
.query(sql)
.parameters(2)
@@ -495,12 +493,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(ctx);
+ plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
assertEquals(3, plan.fragments().size());
}
@@ -516,7 +513,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("PROJECTID", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(select(NODES, 2));
}
@@ -533,7 +530,7 @@ public class PlannerTest extends AbstractPlannerTest {
.add("VER", f.createJavaType(Integer.class))
.build()) {
@Override
- public ColocationGroup colocationGroup(PlanningContext ctx) {
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forNodes(select(NODES, 0, 1));
}
@@ -559,11 +556,11 @@ public class PlannerTest extends AbstractPlannerTest {
+ "WHERE (d.projectId + 1) > ?";
PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
+ .parentContext(BaseQueryContext.builder()
+ .logger(log)
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build())
.build())
.query(sql)
.parameters(2)
@@ -573,12 +570,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(ctx);
+ plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
assertNotNull(plan);
@@ -613,17 +609,6 @@ public class PlannerTest extends AbstractPlannerTest {
+ " WHERE VAL = 10) \n"
+ "WHERE VAL = 10";
- PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .build())
- .query(sql)
- .parameters(2)
- .build();
-
RelNode phys = physicalPlan(sql, publicSchema);
assertNotNull(phys);
@@ -693,12 +678,12 @@ public class PlannerTest extends AbstractPlannerTest {
+ "where d.deptno + e.deptno = 2";
PlanningContext ctx = PlanningContext.builder()
- .localNodeId(first(NODES))
- .originatingNodeId(first(NODES))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .costFactory(new IgniteCostFactory(1, 100, 1, 1))
+ .parentContext(BaseQueryContext.builder()
+ .logger(log)
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .costFactory(new IgniteCostFactory(1, 100, 1, 1))
+ .build())
.build())
.query(sql)
.build();
diff --git a/modules/core/src/main/java/org/apache/ignite/logger/NullLogger.java b/modules/core/src/main/java/org/apache/ignite/logger/NullLogger.java
new file mode 100644
index 0000000..666a4a5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/logger/NullLogger.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.logger;
+
+import java.util.function.Supplier;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Logger which does not output anything.
+ */
+public class NullLogger extends IgniteLogger {
+ /**
+ * Creates null logger.
+ */
+ public NullLogger() {
+ super(NullLogger.class);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override
+ public void info(String msg, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void info(String msg, Throwable th, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void info(Supplier<String> msgSupplier, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void info(String msg, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void debug(String msg, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void debug(String msg, Throwable th, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void debug(Supplier<String> msgSupplier, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void debug(String msg, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void warn(String msg, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void warn(String msg, Throwable th, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void warn(Supplier<String> msgSupplier, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void warn(String msg, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void error(String msg, @Nullable Throwable e) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void error(String msg, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void error(String msg, Throwable th, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void error(Supplier<String> msgSupplier, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void trace(String msg, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void trace(String msg, Throwable th, Object... params) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void trace(Supplier<String> msgSupplier, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void trace(String msg, Throwable th) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isTraceEnabled() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isDebugEnabled() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isInfoEnabled() {
+ return false;
+ }
+}