You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/12/02 18:43:10 UTC
[ignite] branch ignite-12248 updated: context refactoring
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 1824145 context refactoring
1824145 is described below
commit 1824145fdfdd1e78fa8d78b1cc8cbe0984a46beb
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Mon Dec 2 21:42:56 2019 +0300
context refactoring
---
.../query/calcite/CalciteQueryProcessor.java | 45 +++--
.../{RegistryImpl.java => MappingServiceImpl.java} | 59 +-----
.../cluster/TableDistributionServiceImpl.java | 87 ++++++++
...ExchangeService.java => ExchangeProcessor.java} | 2 +-
.../processors/query/calcite/exchange/Outbox.java | 18 +-
.../{LocationRegistry.java => MappingService.java} | 2 +-
...Registry.java => TableDistributionService.java} | 2 +-
.../query/calcite/prepare/DataContextImpl.java | 14 +-
.../calcite/prepare/DistributedExecution.java | 10 +-
.../query/calcite/prepare/IgnitePlanner.java | 5 +-
.../query/calcite/prepare/PlannerContext.java | 213 ++++++++++++++++++++
.../query/calcite/rel/IgniteTableScan.java | 3 +-
.../processors/query/calcite/rule/IgniteRules.java | 4 +-
.../query/calcite/rule/PlannerPhase.java | 8 +-
.../query/calcite/schema/IgniteTable.java | 31 +--
.../query/calcite/splitter/Fragment.java | 20 +-
.../query/calcite/splitter/QueryPlan.java | 4 +-
.../processors/query/calcite/splitter/Source.java | 4 +-
.../query/calcite/trait/AllTargetsFactory.java | 4 +-
.../calcite/trait/DestinationFunctionFactory.java | 4 +-
.../query/calcite/trait/HashFunctionFactory.java | 4 +-
.../query/calcite/trait/NoOpFactory.java | 4 +-
.../query/calcite/trait/RandomTargetFactory.java | 4 +-
.../query/calcite/trait/SingleTargetFactory.java | 4 +-
.../processors/query/calcite/util/Commons.java | 25 +--
.../query/calcite/CalciteQueryProcessorTest.java | 223 ++++++++++++++-------
.../query/calcite/exchange/OutboxTest.java | 5 +-
27 files changed, 548 insertions(+), 260 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 760a71c..c244f4e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.query.calcite;
import java.util.Collections;
import java.util.List;
+import java.util.function.BiFunction;
import org.apache.calcite.config.Lex;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.fun.SqlLibrary;
import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
import org.apache.calcite.sql.parser.SqlParser;
@@ -36,9 +36,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
-import org.apache.ignite.internal.processors.query.calcite.cluster.RegistryImpl;
+import org.apache.ignite.internal.processors.query.calcite.cluster.MappingServiceImpl;
+import org.apache.ignite.internal.processors.query.calcite.cluster.TableDistributionServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution;
import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder;
@@ -49,8 +51,6 @@ import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.query.calcite.util.Commons.provided;
-
/**
*
*/
@@ -110,7 +110,7 @@ public class CalciteQueryProcessor implements QueryEngine {
}
@Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String query, Object... params) throws IgniteSQLException {
- Context context = context(Commons.convert(ctx), query, params);
+ PlannerContext context = context(Commons.convert(ctx), query, params, this::buildContext);
QueryExecution execution = prepare(context);
FieldsQueryCursor<List<?>> cur = execution.execute();
return Collections.singletonList(cur);
@@ -124,16 +124,12 @@ public class CalciteQueryProcessor implements QueryEngine {
return log;
}
- public GridKernalContext context() {
- return kernalContext;
- }
-
/** */
- public IgnitePlanner planner(RelTraitDef[] traitDefs, Context ctx) {
+ public IgnitePlanner planner(RelTraitDef[] traitDefs, PlannerContext ctx0) {
FrameworkConfig cfg = Frameworks.newConfigBuilder(config())
- .defaultSchema(ctx.unwrap(SchemaPlus.class))
+ .defaultSchema(ctx0.schema())
.traitDefs(traitDefs)
- .context(ctx)
+ .context(ctx0)
.build();
return new IgnitePlanner(cfg);
@@ -145,16 +141,25 @@ public class CalciteQueryProcessor implements QueryEngine {
* @param params Query parameters.
* @return Query execution context.
*/
- Context context(@NotNull Context ctx, String query, Object[] params) { // Package private visibility for tests.
- return Contexts.chain(ctx, config.getContext(),
- Contexts.of(
- new Query(query, params),
- new RegistryImpl(kernalContext),
- provided(ctx, SchemaPlus.class, schemaHolder::schema),
- provided(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion)));
+ PlannerContext context(@NotNull Context ctx, String query, Object[] params, BiFunction<Context, Query, PlannerContext> clo) { // Package private visibility for tests.
+ return clo.apply(Contexts.chain(ctx, config.getContext()), new Query(query, params));
+ }
+
+ private PlannerContext buildContext(@NotNull Context parent, @NotNull Query query) {
+ return PlannerContext.builder()
+ .logger(log)
+ .kernalContext(kernalContext)
+ .queryProcessor(this)
+ .parentContext(parent)
+ .query(query)
+ .schema(schemaHolder.schema())
+ .topologyVersion(readyAffinityVersion())
+ .distributionService(new TableDistributionServiceImpl(kernalContext))
+ .mappingService(new MappingServiceImpl(kernalContext))
+ .build();
}
- private QueryExecution prepare(Context ctx) {
+ private QueryExecution prepare(PlannerContext ctx) {
return new DistributedExecution(ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
similarity index 68%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
index 71f8077..66c7a74 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
@@ -20,52 +20,30 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.function.ToIntFunction;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping.DEDUPLICATED;
/**
*
*/
-public class RegistryImpl implements DistributionRegistry, LocationRegistry {
+public class MappingServiceImpl implements MappingService {
private final GridKernalContext ctx;
- public RegistryImpl(GridKernalContext ctx) {
+ public MappingServiceImpl(GridKernalContext ctx) {
this.ctx = ctx;
}
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group();
-
- if (grp.isReplicated())
- return IgniteDistributions.broadcast();
-
- Object key = grp.affinity().similarAffinityKey();
-
- return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key));
- }
-
@Override public NodesMapping local() {
return new NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null, DEDUPLICATED);
}
@@ -153,35 +131,4 @@ public class RegistryImpl implements DistributionRegistry, LocationRegistry {
}
return true;
}
-
- private final static class AffinityFactory extends AbstractDestinationFunctionFactory {
- private final int cacheId;
- private final Object key;
-
- AffinityFactory(int cacheId, Object key) {
- this.cacheId = cacheId;
- this.key = key;
- }
-
- @Override public DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys) {
- assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
-
- List<List<UUID>> assignments = mapping.assignments();
-
- if (U.assertionsEnabled()) {
- for (List<UUID> assignment : assignments) {
- assert F.isEmpty(assignment) || assignment.size() == 1;
- }
- }
-
- ToIntFunction<Object> rowToPart = ctx.unwrap(GridKernalContext.class)
- .cache().context().cacheContext(cacheId).affinity()::partition;
-
- return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
- }
-
- @Override public Object key() {
- return key;
- }
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
new file mode 100644
index 0000000..20da342
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.cluster;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class TableDistributionServiceImpl implements TableDistributionService {
+ private final GridKernalContext ctx;
+
+ public TableDistributionServiceImpl(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
+ CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group();
+
+ if (grp.isReplicated())
+ return IgniteDistributions.broadcast();
+
+ Object key = grp.affinity().similarAffinityKey();
+
+ return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key));
+ }
+
+ private final static class AffinityFactory extends AbstractDestinationFunctionFactory {
+ private final int cacheId;
+ private final Object key;
+
+ AffinityFactory(int cacheId, Object key) {
+ this.cacheId = cacheId;
+ this.key = key;
+ }
+
+ @Override public DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys) {
+ assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
+
+ List<List<UUID>> assignments = mapping.assignments();
+
+ if (U.assertionsEnabled()) {
+ for (List<UUID> assignment : assignments) {
+ assert F.isEmpty(assignment) || assignment.size() == 1;
+ }
+ }
+
+ ToIntFunction<Object> rowToPart = ctx.kernalContext()
+ .cache().context().cacheContext(cacheId).affinity()::partition;
+
+ return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
+ }
+
+ @Override public Object key() {
+ return key;
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
index 788011a..692b4fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
*
*/
-public interface ExchangeService {
+public interface ExchangeProcessor {
void register(Outbox outbox);
void unregister(Outbox outbox);
void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int batchId, List<?> rows);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
index 47351e2..b1cf688 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
@@ -43,7 +43,7 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T>
private final Collection<UUID> targets;
private final DestinationFunction function;
- private ExchangeService srvc;
+ private ExchangeProcessor srvc;
protected Outbox(GridCacheVersion queryId, long exchangeId, Collection<UUID> targets, DestinationFunction function) {
super(Sink.noOp());
@@ -54,6 +54,14 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T>
this.function = function;
}
+ public void init(ExchangeProcessor srvc) {
+ this.srvc = srvc;
+
+ srvc.register(this);
+
+ signal();
+ }
+
public void acknowledge(UUID nodeId, int batchId) {
perNode.get(nodeId).acknowledge(batchId);
}
@@ -91,14 +99,6 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T>
return true;
}
- public void init(ExchangeService srvc) {
- this.srvc = srvc;
-
- srvc.register(this);
-
- signal();
- }
-
@Override public void end() {
for (UUID node : targets)
perNode.computeIfAbsent(node, Destination::new).end();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
index bf62302..9472cd1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
@@ -21,7 +21,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
*
*/
-public interface LocationRegistry {
+public interface MappingService {
NodesMapping local(); // returns local node with single partition
NodesMapping random(AffinityTopologyVersion topVer); // returns random distribution, partitions count depends on nodes count
NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer); // returns cache distribution
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
index 3a20908..ad710aa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
@@ -22,6 +22,6 @@ import org.apache.ignite.internal.processors.query.calcite.type.RowType;
/**
*
*/
-public interface DistributionRegistry {
+public interface TableDistributionService {
DistributionTrait distribution(int cacheId, RowType rowType);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
index 49b57ad..06aaa024 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
@@ -20,18 +20,15 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.Map;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
/**
*
*/
class DataContextImpl implements DataContext {
/** */
- private final JavaTypeFactoryImpl typeFactory;
+ private final JavaTypeFactory typeFactory;
/** */
private final SchemaPlus schema;
@@ -46,10 +43,11 @@ class DataContextImpl implements DataContext {
* @param params Parameters.
* @param ctx Query context.
*/
- DataContextImpl(Map<String, Object> params, Context ctx) {
- typeFactory = new JavaTypeFactoryImpl(ctx.unwrap(CalciteQueryProcessor.class).config().getTypeSystem());
- schema = ctx.unwrap(SchemaPlus.class);
- queryProvider = ctx.unwrap(QueryProvider.class);
+ DataContextImpl(Map<String, Object> params, PlannerContext ctx) {
+ typeFactory = ctx.typeFactory();
+ schema = ctx.schema();
+ queryProvider = ctx.queryProvider();
+
this.params = params;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
index fdaee5a..192e686 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
@@ -47,19 +45,19 @@ import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryC
*/
public class DistributedExecution implements QueryExecution {
/** */
- private final Context ctx;
+ private final PlannerContext ctx;
/**
* @param ctx Query context.
*/
- public DistributedExecution(Context ctx) {
+ public DistributedExecution(PlannerContext ctx) {
this.ctx = ctx;
}
/** {@inheritDoc} */
@Override public FieldsQueryCursor<List<?>> execute() {
- CalciteQueryProcessor proc = Objects.requireNonNull(ctx.unwrap(CalciteQueryProcessor.class));
- Query query = Objects.requireNonNull(ctx.unwrap(Query.class));
+ CalciteQueryProcessor proc = ctx.queryProcessor();
+ Query query = ctx.query();
RelTraitDef[] traitDefs = {
RelDistributionTraitDef.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 2462785..2506de8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
@@ -124,6 +125,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
.typeSystem(RelDataTypeSystem.class, IgniteTypeSystem.DEFAULT);
typeFactory = new IgniteTypeFactory(typeSystem);
+
+ Commons.plannerContext(context).planner(this);
}
private CalciteConnectionConfig connConfig() {
@@ -302,7 +305,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
ready();
RelTraitSet toTraits = targetTraits.simplify();
- RuleSet rules = plannerPhase.getRules(context);
+ RuleSet rules = plannerPhase.getRules(Commons.plannerContext(context));
input.accept(new MetaDataProviderModifier(metadataProvider));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
new file mode 100644
index 0000000..dc20880
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exchange.ExchangeProcessor;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+
+/**
+ *
+ */
+public final class PlannerContext implements Context {
+ private final Context parentContext;
+ private final Query query;
+ private final AffinityTopologyVersion topologyVersion;
+ private final SchemaPlus schema;
+ private final IgniteLogger logger;
+ private final GridKernalContext kernalContext;
+ private final CalciteQueryProcessor queryProcessor;
+ private final MappingService mappingService;
+ private final TableDistributionService distributionService;
+ private final ExchangeProcessor exchangeProcessor;
+
+ private IgnitePlanner planner;
+
+ private PlannerContext(Context parentContext, Query query, AffinityTopologyVersion topologyVersion,
+ SchemaPlus schema, IgniteLogger logger, GridKernalContext kernalContext, CalciteQueryProcessor queryProcessor, MappingService mappingService,
+ TableDistributionService distributionService, ExchangeProcessor exchangeProcessor) {
+ this.parentContext = parentContext;
+ this.query = query;
+ this.topologyVersion = topologyVersion;
+ this.schema = schema;
+ this.logger = logger;
+ this.kernalContext = kernalContext;
+ this.queryProcessor = queryProcessor;
+ this.mappingService = mappingService;
+ this.distributionService = distributionService;
+ this.exchangeProcessor = exchangeProcessor;
+ }
+
+ public Query query() {
+ return query;
+ }
+
+ public AffinityTopologyVersion topologyVersion() {
+ return topologyVersion;
+ }
+
+ public SchemaPlus schema() {
+ return schema;
+ }
+
+ public IgniteLogger logger() {
+ return logger;
+ }
+
+ public GridKernalContext kernalContext() {
+ return kernalContext;
+ }
+
+ public CalciteQueryProcessor queryProcessor() {
+ return queryProcessor;
+ }
+
+ void planner(IgnitePlanner planner) {
+ this.planner = planner;
+ }
+
+ public IgnitePlanner planner() {
+ return planner;
+ }
+
+ public MappingService mappingService() {
+ return mappingService;
+ }
+
+ public TableDistributionService distributionService() {
+ return distributionService;
+ }
+
+ public ExchangeProcessor exchangeProcessor() {
+ return exchangeProcessor;
+ }
+
+ // Helper methods
+
+ public JavaTypeFactory typeFactory() {
+ return planner.getTypeFactory();
+ }
+
+ public NodesMapping mapForLocal() {
+ return mappingService.local();
+ }
+
+ public NodesMapping mapForRandom(AffinityTopologyVersion topVer) {
+ return mappingService.random(topVer);
+ }
+
+ public NodesMapping mapForCache(int cacheId, AffinityTopologyVersion topVer) {
+ return mappingService.distributed(cacheId, topVer);
+ }
+
+ public DistributionTrait distributionTrait(int cacheId, RowType rowType) {
+ return distributionService.distribution(cacheId, rowType);
+ }
+
+ public QueryProvider queryProvider() {
+ return null; // TODO
+ }
+
+ @Override public <C> C unwrap(Class<C> aClass) {
+ if (aClass == getClass())
+ return aClass.cast(this);
+
+ return parentContext.unwrap(aClass);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Context parentContext;
+ private Query query;
+ private AffinityTopologyVersion topologyVersion;
+ private SchemaPlus schema;
+ private IgniteLogger logger;
+ private GridKernalContext kernalContext;
+ private CalciteQueryProcessor queryProcessor;
+ private MappingService mappingService;
+ private TableDistributionService distributionService;
+ private ExchangeProcessor exchangeProcessor;
+
+ public Builder parentContext(Context parentContext) {
+ this.parentContext = parentContext;
+ return this;
+ }
+
+ public Builder query(Query query) {
+ this.query = query;
+ return this;
+ }
+
+ public Builder topologyVersion(AffinityTopologyVersion topologyVersion) {
+ this.topologyVersion = topologyVersion;
+ return this;
+ }
+
+ public Builder schema(SchemaPlus schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public Builder logger(IgniteLogger logger) {
+ this.logger = logger;
+ return this;
+ }
+
+ public Builder kernalContext(GridKernalContext kernalContext) {
+ this.kernalContext = kernalContext;
+ return this;
+ }
+
+ public Builder queryProcessor(CalciteQueryProcessor queryProcessor) {
+ this.queryProcessor = queryProcessor;
+ return this;
+ }
+
+ public Builder mappingService(MappingService mappingService) {
+ this.mappingService = mappingService;
+ return this;
+ }
+
+ public Builder distributionService(TableDistributionService distributionService) {
+ this.distributionService = distributionService;
+ return this;
+ }
+
+ public Builder exchangeProcessor(ExchangeProcessor exchangeProcessor) {
+ this.exchangeProcessor = exchangeProcessor;
+ return this;
+ }
+
+ public PlannerContext build() {
+ return new PlannerContext(parentContext, query, topologyVersion, schema, logger, kernalContext, queryProcessor, mappingService, distributionService, exchangeProcessor);
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 12b7b99..c529aee 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
public final class IgniteTableScan extends TableScan implements IgniteRel {
@@ -44,6 +45,6 @@ public final class IgniteTableScan extends TableScan implements IgniteRel {
public FragmentInfo fragmentInfo() {
return getTable().unwrap(IgniteTable.class)
- .fragmentInfo(getCluster().getPlanner().getContext());
+ .fragmentInfo(Commons.plannerContext(getCluster().getPlanner().getContext()));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
index d089339..cf514d5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.calcite.rule;
import com.google.common.collect.ImmutableList;
import java.util.List;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.volcano.AbstractConverter;
import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
@@ -62,6 +61,7 @@ import org.apache.calcite.rel.rules.UnionMergeRule;
import org.apache.calcite.rel.rules.UnionPullUpConstantsRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.calcite.rel.rules.ValuesReduceRule;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
/**
*
@@ -149,7 +149,7 @@ public class IgniteRules {
IgniteProjectRule.INSTANCE,
IgniteJoinRule.INSTANCE);
- public static List<RelOptRule> logicalRules(Context ctx) {
+ public static List<RelOptRule> logicalRules(PlannerContext ctx) {
return ImmutableList.<RelOptRule>builder()
.addAll(BASE_RULES)
.addAll(ABSTRACT_RULES)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
index aa82187..24938a1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.query.calcite.rule;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.RuleSets;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
/**
*
@@ -27,14 +27,14 @@ import org.apache.calcite.tools.RuleSets;
public enum PlannerPhase {
/** */
SUBQUERY_REWRITE("Sub-queries rewrites") {
- @Override public RuleSet getRules(Context ctx) {
+ @Override public RuleSet getRules(PlannerContext ctx) {
return RuleSets.ofList(IgniteRules.SUBQUERY_REWRITE_RULES);
}
},
/** */
LOGICAL("Logical planning") {
- @Override public RuleSet getRules(Context ctx) {
+ @Override public RuleSet getRules(PlannerContext ctx) {
return RuleSets.ofList(IgniteRules.logicalRules(ctx));
}
};
@@ -45,5 +45,5 @@ public enum PlannerPhase {
this.description = description;
}
- public abstract RuleSet getRules(Context ctx);
+ public abstract RuleSet getRules(PlannerContext ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index b14ea3b..805c455 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
@@ -26,15 +25,14 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.internal.CU;
/** */
@@ -71,30 +69,19 @@ public class IgniteTable extends AbstractTable implements TranslatableTable {
/** {@inheritDoc} */
@Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
RelOptCluster cluster = context.getCluster();
+ PlannerContext ctx = Commons.plannerContext(cluster.getPlanner().getContext());
RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION)
- .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(cluster.getPlanner().getContext()));
+ .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(ctx));
return new IgniteTableScan(cluster, traitSet, relOptTable);
}
- public DistributionTrait distributionTrait(Context context) {
- return distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType);
+ public DistributionTrait distributionTrait(PlannerContext context) {
+ return Commons.plannerContext(context).distributionTrait(CU.cacheId(cacheName), rowType);
}
- public FragmentInfo fragmentInfo(Context ctx) {
- int cacheId = CU.cacheId(cacheName);
+ public FragmentInfo fragmentInfo(PlannerContext ctx) {
+ PlannerContext ctx0 = Commons.plannerContext(ctx);
- return new FragmentInfo(locationRegistry(ctx).distributed(cacheId, topologyVersion(ctx)));
- }
-
- private LocationRegistry locationRegistry(Context ctx) {
- return ctx.unwrap(LocationRegistry.class);
- }
-
- public DistributionRegistry distributionRegistry(Context ctx) {
- return ctx.unwrap(DistributionRegistry.class);
- }
-
- private AffinityTopologyVersion topologyVersion(Context ctx) {
- return ctx.unwrap(AffinityTopologyVersion.class);
+ return new FragmentInfo(ctx0.mapForCache(CU.cacheId(cacheName), ctx0.topologyVersion()));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index d12c73a..45c4591 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -17,17 +17,14 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
import com.google.common.collect.ImmutableList;
-import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -49,11 +46,12 @@ public class Fragment implements Source {
this.root = root;
}
- public void init(Context ctx, RelMetadataQuery mq) {
+ public void init(PlannerContext ctx, RelMetadataQuery mq) {
+
FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
if (info.mapping() == null)
- mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) : registry(ctx).local();
+ mapping = remote() ? ctx.mapForRandom(ctx.topologyVersion()) : ctx.mapForLocal();
else
mapping = info.mapping().deduplicate();
@@ -73,7 +71,7 @@ public class Fragment implements Source {
return exchangeId;
}
- @Override public void init(NodesMapping mapping, DistributionTrait distribution, Context ctx, RelMetadataQuery mq) {
+ @Override public void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
assert remote();
((Sender) root).init(new TargetImpl(exchangeId, mapping, distribution));
@@ -92,12 +90,4 @@ public class Fragment implements Source {
private boolean remote() {
return root instanceof Sender;
}
-
- private LocationRegistry registry(Context ctx) {
- return Objects.requireNonNull(ctx.unwrap(LocationRegistry.class));
- }
-
- private AffinityTopologyVersion topologyVersion(Context ctx) {
- return Objects.requireNonNull(ctx.unwrap(AffinityTopologyVersion.class));
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index c03879b..b0369bd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
import java.util.List;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
import org.apache.ignite.internal.processors.query.calcite.util.Edge;
@@ -39,7 +39,7 @@ public class QueryPlan {
this.fragments = fragments;
}
- public void init(Context ctx) {
+ public void init(PlannerContext ctx) {
int i = 0;
RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
index 773dee1..e0108b7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
@@ -16,9 +16,9 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
/**
@@ -41,7 +41,7 @@ public interface Source {
* @param ctx Context.
* @param mq Metadata query instance.
*/
- default void init(NodesMapping mapping, DistributionTrait distribution, Context ctx, RelMetadataQuery mq) {
+ default void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
// No-op.
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
index 2dcfe33..30ce8b8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
@@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.trait;
import java.io.ObjectStreamException;
import java.util.List;
import java.util.UUID;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
/**
*
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
public final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
public static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
- @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+ @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
List<UUID> nodes = m.nodes();
return r -> nodes;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
index 587c172..d12ec15 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
import java.io.Serializable;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
/**
*
*/
public interface DestinationFunctionFactory extends Serializable {
- DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys);
+ DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys);
Object key();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index 3c2420b..863f93c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -20,9 +20,9 @@ import java.io.ObjectStreamException;
import java.util.List;
import java.util.UUID;
import java.util.function.ToIntFunction;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
public final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
public static final DestinationFunctionFactory INSTANCE = new HashFunctionFactory();
- @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+ @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
assert m != null && !F.isEmpty(m.assignments());
int[] fields = k.toIntArray();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
index d8495b6..7086277 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
/**
*
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
public final class NoOpFactory extends AbstractDestinationFunctionFactory {
public static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
- @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+ @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
return null;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
index 78013d3..8a643c3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
@@ -21,9 +21,9 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
/**
*
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
public final class RandomTargetFactory extends AbstractDestinationFunctionFactory {
public static final DestinationFunctionFactory INSTANCE = new RandomTargetFactory();
- @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+ @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
List<UUID> nodes = m.nodes();
return r -> Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
index 4d631fc..8a8b27c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
@@ -21,9 +21,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.util.typedef.F;
/**
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.F;
public final class SingleTargetFactory extends AbstractDestinationFunctionFactory {
public static final DestinationFunctionFactory INSTANCE = new SingleTargetFactory();
- @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+ @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
List<UUID> nodes = Collections.singletonList(Objects.requireNonNull(F.first(m.nodes())));
return r -> nodes;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index c6d6a6c..3a461cf 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -28,7 +28,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
@@ -42,11 +41,11 @@ import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
/**
*
@@ -58,24 +57,6 @@ public final class Commons {
return ctx == null ? Contexts.empty() : Contexts.of(ctx.unwrap(Object[].class));
}
- public static <T> @Nullable T provided(Context ctx, Class<T> paramType, Supplier<T> paramSrc) {
- T param = ctx.unwrap(paramType);
-
- if (param != null)
- return null; // Provided by parent context.
-
- return paramSrc.get();
- }
-
- public static <T> T contextParam(Context ctx, Class<T> paramType, Supplier<T> paramSrc) {
- T param = ctx.unwrap(paramType);
-
- if (param != null)
- return param;
-
- return paramSrc.get();
- }
-
/** */
public static RowType rowType(GridQueryTypeDescriptor desc) {
RowType.Builder b = RowType.builder();
@@ -189,4 +170,8 @@ public final class Commons {
return set;
}
+
+ public static PlannerContext plannerContext(Context ctx) {
+ return Objects.requireNonNull(ctx.unwrap(PlannerContext.class));
+ }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 6c04c1e..644633b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -35,10 +34,11 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -68,18 +69,17 @@ import org.junit.Test;
//@WithSystemProperty(key = "calcite.debug", value = "true")
public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
+ private static GridTestKernalContext kernalContext;
private static CalciteQueryProcessor proc;
private static SchemaPlus schema;
-
- private static TestRegistry registry;
private static List<UUID> nodes;
@BeforeClass
public static void setupClass() {
+ kernalContext = new GridTestKernalContext(log);
proc = new CalciteQueryProcessor();
-
proc.setLogger(log);
- proc.start(new GridTestKernalContext(log));
+ proc.start(kernalContext);
IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
@@ -123,8 +123,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
for (int i = 0; i < 4; i++) {
nodes.add(UUID.randomUUID());
}
-
- registry = new TestRegistry();
}
@Test
@@ -136,7 +134,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.id0 + 1" +
"WHERE (d.projectId + 1) > ?";
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
assertNotNull(ctx);
@@ -149,7 +147,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(query);
@@ -175,7 +173,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
assertNotNull(ctx);
@@ -188,7 +186,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(query);
@@ -212,7 +210,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
String sql = "SELECT d.id, (SELECT p.name FROM Project p WHERE p.id = d.id) name, d.projectId " +
"FROM Developer d";
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
assertNotNull(ctx);
@@ -225,7 +223,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(query);
@@ -251,7 +249,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
assertNotNull(ctx);
@@ -264,7 +262,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(query);
@@ -297,7 +295,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
assertNotNull(ctx);
@@ -311,7 +309,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(query);
@@ -357,7 +355,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
assertNotNull(ctx);
@@ -371,7 +369,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -433,7 +431,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
assertNotNull(ctx);
@@ -447,7 +445,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -497,10 +495,20 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- TestRegistry registry = new TestRegistry(){
+ TableDistributionService ds = new TableDistributionService(){
@Override public DistributionTrait distribution(int cacheId, RowType rowType) {
return IgniteDistributions.broadcast();
}
+ };
+
+ MappingService ms = new MappingService() {
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
+
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
+ }
@Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
@@ -512,9 +520,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
-
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
-
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
assertNotNull(ctx);
RelTraitDef[] traitDefs = {
@@ -527,7 +533,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -577,13 +583,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- TestRegistry registry = new TestRegistry(){
+ TableDistributionService ds = new TableDistributionService(){
@Override public DistributionTrait distribution(int cacheId, RowType rowType) {
if (cacheId == CU.cacheId("Project"))
return IgniteDistributions.broadcast();
return IgniteDistributions.hash(rowType.distributionKeys());
}
+ };
+
+ MappingService ms = new MappingService() {
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
+
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
+ }
@Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
@@ -601,7 +617,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
assertNotNull(ctx);
@@ -615,7 +631,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -665,13 +681,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- TestRegistry registry = new TestRegistry(){
+ TableDistributionService ds = new TableDistributionService(){
@Override public DistributionTrait distribution(int cacheId, RowType rowType) {
if (cacheId == CU.cacheId("Project"))
return IgniteDistributions.broadcast();
return IgniteDistributions.hash(rowType.distributionKeys());
}
+ };
+
+ MappingService ms = new MappingService() {
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
+
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
+ }
@Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
@@ -689,7 +715,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
assertNotNull(ctx);
@@ -703,7 +729,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -753,10 +779,20 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.ver0 " +
"WHERE (d.projectId + 1) > ?";
- TestRegistry registry = new TestRegistry(){
+ TableDistributionService ds = new TableDistributionService(){
@Override public DistributionTrait distribution(int cacheId, RowType rowType) {
return IgniteDistributions.broadcast();
}
+ };
+
+ MappingService ms = new MappingService() {
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
+
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
+ }
@Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
@@ -769,7 +805,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
assertNotNull(ctx);
@@ -783,7 +819,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -834,13 +870,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"WHERE (d.projectId + 1) > ?";
- TestRegistry registry = new TestRegistry(){
+ TableDistributionService ds = new TableDistributionService(){
@Override public DistributionTrait distribution(int cacheId, RowType rowType) {
if (cacheId == CU.cacheId("Project"))
return IgniteDistributions.broadcast();
return IgniteDistributions.hash(rowType.distributionKeys());
}
+ };
+
+ MappingService ms = new MappingService() {
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
+
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
+ }
@Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
@@ -858,7 +904,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
assertNotNull(ctx);
@@ -872,7 +918,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -923,13 +969,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"WHERE (d.projectId + 1) > ?";
- TestRegistry registry = new TestRegistry(){
+ TableDistributionService ds = new TableDistributionService() {
@Override public DistributionTrait distribution(int cacheId, RowType rowType) {
if (cacheId == CU.cacheId("Project"))
return IgniteDistributions.broadcast();
return IgniteDistributions.hash(rowType.distributionKeys());
}
+ };
+
+ MappingService ms = new MappingService() {
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
+
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
+ }
@Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
@@ -946,7 +1002,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
throw new AssertionError("Unexpected cache id:" + cacheId);
}
};
- Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
assertNotNull(ctx);
@@ -960,7 +1017,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = ctx.unwrap(Query.class);
+ Query query = Commons.plannerContext(ctx).query();
assertNotNull(planner);
@@ -1011,40 +1068,58 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
return res;
}
- private static class TestRegistry implements LocationRegistry, DistributionRegistry {
- private AtomicLong idGen = new AtomicLong();
+ private PlannerContext context(Context c, Query q) {
+ MappingService ms = new MappingService() {
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+ }
- @Override public NodesMapping random(AffinityTopologyVersion topVer) {
- return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
- }
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
+ }
- @Override public NodesMapping local() {
- return new NodesMapping(select(nodes, 0), null, (byte) 0);
- }
+ @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
+ if (cacheId == CU.cacheId("Developer"))
+ return new NodesMapping(null, Arrays.asList(
+ select(nodes, 0,1),
+ select(nodes, 1,2),
+ select(nodes, 2,0),
+ select(nodes, 0,1),
+ select(nodes, 1,2)
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
+ if (cacheId == CU.cacheId("Project"))
+ return new NodesMapping(null, Arrays.asList(
+ select(nodes, 0,1),
+ select(nodes, 1,2),
+ select(nodes, 2,0),
+ select(nodes, 0,1),
+ select(nodes, 1,2)
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- return IgniteDistributions.hash(rowType.distributionKeys());
- }
+ throw new AssertionError("Unexpected cache id:" + cacheId);
+ }
+ };
- @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
- if (cacheId == CU.cacheId("Developer"))
- return new NodesMapping(null, Arrays.asList(
- select(nodes, 0,1),
- select(nodes, 1,2),
- select(nodes, 2,0),
- select(nodes, 0,1),
- select(nodes, 1,2)
- ), NodesMapping.HAS_PARTITIONED_CACHES);
- if (cacheId == CU.cacheId("Project"))
- return new NodesMapping(null, Arrays.asList(
- select(nodes, 0,1),
- select(nodes, 1,2),
- select(nodes, 2,0),
- select(nodes, 0,1),
- select(nodes, 1,2)
- ), NodesMapping.HAS_PARTITIONED_CACHES);
-
- throw new AssertionError("Unexpected cache id:" + cacheId);
- }
+ TableDistributionService ds = new TableDistributionService() {
+ @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
+ return IgniteDistributions.hash(rowType.distributionKeys());
+ }
+ };
+
+ return context(c, q, ms, ds);
+ }
+
+ private PlannerContext context(Context parent, Query query, MappingService ms, TableDistributionService ds) {
+ return PlannerContext.builder()
+ .parentContext(parent)
+ .logger(log)
+ .kernalContext(kernalContext)
+ .queryProcessor(proc)
+ .query(query)
+ .schema(schema)
+ .topologyVersion(AffinityTopologyVersion.NONE)
+ .distributionService(ds)
+ .mappingService(ms)
+ .build();
}
}
\ No newline at end of file
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
index 89efa7c..a08142c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -5,7 +5,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import org.apache.calcite.plan.Contexts;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
@@ -38,7 +37,7 @@ public class OutboxTest extends GridCommonAbstractTest {
NodesMapping mapping = new NodesMapping(Collections.singletonList(nodeId), null, NodesMapping.DEDUPLICATED);
targets = mapping.nodes();
- func = SingleTargetFactory.INSTANCE.create(Contexts.empty(), mapping, ImmutableIntList.of());
+ func = SingleTargetFactory.INSTANCE.create(null, mapping, ImmutableIntList.of());
}
@@ -100,7 +99,7 @@ public class OutboxTest extends GridCommonAbstractTest {
assertEquals(EndMarker.INSTANCE, F.last(exch.lastBatch));
}
- private static class TestExchangeService implements ExchangeService {
+ private static class TestExchangeService implements ExchangeProcessor {
private boolean registered;
private boolean unregistered;
private List<Integer> ids = new ArrayList<>();