You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/09/29 12:57:46 UTC
[ignite] branch ignite-12248 updated: query cache improvements
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 54a02e1 query cache improvements
54a02e1 is described below
commit 54a02e1976e26bd3e3ab54c084a9fc8ae61e45e6
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Sep 29 15:49:44 2020 +0300
query cache improvements
---
.../query/calcite/exec/ExecutionServiceImpl.java | 160 ++++++++++-----------
.../calcite/prepare/AbstractMultiStepPlan.java | 62 +-------
.../processors/query/calcite/prepare/Cloner.java | 65 +++------
.../query/calcite/prepare/ExplainPlan.java | 5 +-
.../processors/query/calcite/prepare/Fragment.java | 2 +-
.../prepare/{QueryPlan.java => FragmentPlan.java} | 40 +++---
.../query/calcite/prepare/FragmentSplitter.java | 111 +-------------
.../{Splitter.java => IgniteRelShuttle.java} | 103 +++----------
.../query/calcite/prepare/MultiStepDmlPlan.java | 13 +-
.../query/calcite/prepare/MultiStepQueryPlan.java | 13 +-
...stractMultiStepPlan.java => QueryMappings.java} | 101 +++++--------
.../query/calcite/prepare/QueryPlan.java | 6 +-
.../processors/query/calcite/prepare/Splitter.java | 120 +---------------
13 files changed, 210 insertions(+), 591 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 6e3c0a7..59d6d49 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -28,6 +28,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitDef;
@@ -83,6 +85,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryF
import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription;
+import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
@@ -366,9 +369,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
String qry,
Object[] params
) {
- PlanningContext pctx = createContext(ctx, schema, qry, params);
+ PlanningContext pctx = createContext(Commons.convert(ctx), topologyVersion(), localNodeId(), schema, qry, params);
- List<QueryPlan> qryPlans = prepareQueryPlan(pctx);
+ List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareQuery);
return executePlans(qryPlans, pctx);
}
@@ -453,12 +456,8 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
- private PlanningContext createContext(
- @Nullable QueryContext qryCtx,
- @Nullable String schemaName,
- String qry,
- Object[] params
- ) {
+ private PlanningContext createContext(Context parent, AffinityTopologyVersion topVer, UUID originator,
+ @Nullable String schema, String qry, Object[] params) {
RelTraitDef<?>[] traitDefs = {
ConventionTraitDef.INSTANCE,
RelCollationTraitDef.INSTANCE,
@@ -468,56 +467,23 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
return PlanningContext.builder()
.localNodeId(localNodeId())
- .parentContext(Commons.convert(qryCtx))
+ .originatingNodeId(originator)
+ .parentContext(parent)
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schemaName != null
- ? schemaHolder().schema().getSubSchema(schemaName)
+ .defaultSchema(schema != null
+ ? schemaHolder().schema().getSubSchema(schema)
: schemaHolder().schema())
.traitDefs(traitDefs)
.build())
.query(qry)
.parameters(params)
- .topologyVersion(topologyVersion())
- .logger(log)
- .build();
- }
-
- /** */
- private PlanningContext createContext(
- @Nullable String schemaName,
- UUID originatingNodeId,
- AffinityTopologyVersion topVer
- ) {
- // TODO pass to context user locale and timezone.
-
- RelTraitDef<?>[] traitDefs = {
- ConventionTraitDef.INSTANCE,
- RelCollationTraitDef.INSTANCE,
- DistributionTraitDef.INSTANCE,
- RewindabilityTraitDef.INSTANCE
- };
-
- return PlanningContext.builder()
- .localNodeId(localNodeId())
- .originatingNodeId(originatingNodeId)
- .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schemaName != null
- ? schemaHolder().schema().getSubSchema(schemaName)
- : schemaHolder().schema())
- .traitDefs(traitDefs)
- .build())
.topologyVersion(topVer)
.logger(log)
.build();
}
/** */
- private List<QueryPlan> prepareQueryPlan(PlanningContext ctx) {
- return queryPlanCache().queryPlan(ctx, new CacheKey(ctx.schemaName(), ctx.query()), this::prepare0);
- }
-
- /** */
- private List<QueryPlan> prepare0(PlanningContext ctx) {
+ private List<QueryPlan> prepareQuery(PlanningContext ctx) {
try {
String qry = ctx.query();
@@ -552,6 +518,11 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
+ private List<QueryPlan> prepareFragment(PlanningContext ctx) {
+ return ImmutableList.of(new FragmentPlan(fromJson(ctx, ctx.query())));
+ }
+
+ /** */
private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
assert single(sqlNode);
@@ -741,7 +712,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
QueryStartRequest req = new QueryStartRequest(
qryId,
pctx.schemaName(),
- fragment0.rootSerialized(),
+ fragment0.serialized(),
pctx.topologyVersion(),
fragmentDesc0,
pctx.parameters());
@@ -771,6 +742,56 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
+ private void executeFragment(UUID qryId, FragmentPlan plan, PlanningContext pctx, FragmentDescription fragmentDesc) {
+ ExecutionContext<Row> ectx = new ExecutionContext<>(taskExecutor(), pctx, qryId,
+ fragmentDesc, handler, Commons.parametersMap(pctx.parameters()));
+
+ long frId = fragmentDesc.fragmentId();
+ UUID origNodeId = pctx.originatingNodeId();
+
+ Outbox<Row> node;
+ try {
+ node = new LogicalRelImplementor<>(
+ ectx,
+ partitionService(),
+ mailboxRegistry(),
+ exchangeService(),
+ failureProcessor())
+ .go(plan.root());
+ }
+ catch (Exception ex) {
+ U.error(log, "Failed to build execution tree. ", ex);
+
+ mailboxRegistry.outboxes(qryId, frId, -1)
+ .forEach(Outbox::close);
+ mailboxRegistry.inboxes(qryId, frId, -1)
+ .forEach(Inbox::close);
+
+ try {
+ messageService().send(origNodeId, new QueryStartResponse(qryId, frId, ex));
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
+ }
+
+ return;
+ }
+
+ try {
+ messageService().send(origNodeId, new QueryStartResponse(qryId, frId));
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
+
+ node.onNodeLeft(origNodeId);
+
+ return;
+ }
+
+ node.init();
+ }
+
+ /** */
private void register(QueryInfo info) {
UUID qryId = info.ctx.queryId();
PlanningContext pctx = info.ctx.planningContext();
@@ -832,50 +853,15 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
private void onMessage(UUID nodeId, QueryStartRequest msg) {
assert nodeId != null && msg != null;
- PlanningContext ctx = createContext(msg.schema(), nodeId, msg.topologyVersion());
- ExecutionContext<Row> execCtx = new ExecutionContext<>(taskExecutor(), ctx, msg.queryId(),
- msg.fragmentDescription(), handler, Commons.parametersMap(msg.parameters()));
-
- Outbox<Row> node;
- try {
- node = new LogicalRelImplementor<>(
- execCtx,
- partitionService(),
- mailboxRegistry(),
- exchangeService(),
- failureProcessor())
- .go(fromJson(ctx, msg.root()));
- }
- catch (Exception ex) {
- U.error(log, "Failed to build execution tree. ", ex);
-
- mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1)
- .forEach(Outbox::close);
- mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1)
- .forEach(Inbox::close);
+ PlanningContext pctx = createContext(Contexts.empty(), msg.topologyVersion(), nodeId, msg.schema(), msg.root(), msg.parameters());
- try {
- messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
- }
+ List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareFragment);
- return;
- }
+ assert qryPlans.size() == 1 && qryPlans.get(0).type() == QueryPlan.Type.FRAGMENT;
- try {
- messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId()));
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
+ FragmentPlan plan = (FragmentPlan)qryPlans.get(0);
- node.onNodeLeft(nodeId);
-
- return;
- }
-
- node.init();
+ executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription());
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 47aa7c8..f146e39 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -21,13 +21,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.util.typedef.F;
@@ -44,12 +40,16 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
protected final List<GridQueryFieldMetadata> fieldsMeta;
/** */
+ protected final QueryMappings queryMappings;
+
+ /** */
protected Map<Long, NodesMapping> mappings;
/** */
- protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
+ protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings queryMappings) {
this.fragments = fragments;
this.fieldsMeta = fieldsMeta;
+ this.queryMappings = queryMappings;
}
/** {@inheritDoc} */
@@ -92,61 +92,11 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
/** {@inheritDoc} */
@Override public void init(MappingService mappingService, PlanningContext ctx) {
- mappings = U.newHashMap(fragments.size());
-
- RelMetadataQuery mq = F.first(fragments).root().getCluster().getMetadataQuery();
-
- for (int i = 0, j = 0; i < fragments.size();) {
- Fragment fragment = fragments.get(i);
-
- try {
- mappings.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq));
-
- i++;
- }
- catch (OptimisticPlanningException e) {
- if (++j > 3)
- throw new IgniteSQLException("Failed to map query.", e);
-
- replace(fragment, new FragmentSplitter(e.node()).go(fragment));
-
- // restart init routine.
- mappings.clear();
- i = 0;
- }
- }
+ mappings = queryMappings.map(mappingService, ctx, fragments);
}
/** */
private NodesMapping fragmentMapping(long fragmentId) {
return mappings == null ? null : mappings.get(fragmentId);
}
-
- /** */
- private void replace(Fragment fragment, List<Fragment> replacement) {
- assert !F.isEmpty(replacement);
-
- Map<Long, Long> newTargets = new HashMap<>();
-
- for (Fragment fragment0 : replacement) {
- for (IgniteReceiver remote : fragment0.remotes())
- newTargets.put(remote.exchangeId(), fragment0.fragmentId());
- }
-
- for (int i = 0; i < fragments.size(); i++) {
- Fragment fragment0 = fragments.get(i);
-
- if (fragment0 == fragment)
- fragments.set(i, F.first(replacement));
- else if (!fragment0.local()) {
- IgniteSender sender = (IgniteSender)fragment0.root();
- Long newTargetId = newTargets.get(sender.exchangeId());
-
- if (newTargetId != null)
- sender.targetFragmentId(newTargetId);
- }
- }
-
- fragments.addAll(replacement.subList(1, replacement.size()));
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 9e22c47..aeee7f5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
@@ -42,7 +41,6 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchang
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
/** */
class Cloner implements IgniteRelVisitor<IgniteRel> {
@@ -50,7 +48,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
private final RelOptCluster cluster;
/** */
- private FragmentProto curr;
+ private ImmutableList.Builder<IgniteReceiver> remotes;
Cloner(RelOptCluster cluster) {
this.cluster = cluster;
@@ -59,22 +57,31 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
/**
* Clones and associates a plan with a new cluster.
*
- * @param src Plan to clone.
+ * @param src Fragments to clone.
* @return New plan.
*/
- List<Fragment> go(List<Fragment> src) {
- assert !F.isEmpty(src);
+ public List<Fragment> go(List<Fragment> src) {
+ return Commons.transform(src, this::go);
+ }
+
+ /**
+ * Clones and associates a plan with a new cluster.
+ *
+ * @param src Fragment to clone.
+ * @return New plan.
+ */
+ public Fragment go(Fragment src) {
+ try {
+ remotes = ImmutableList.builder();
- List<Fragment> fragments = new ArrayList<>(src.size());
+ IgniteRel newRoot = visit(src.root());
+ ImmutableList<IgniteReceiver> remotes = this.remotes.build();
- for (Fragment fragment : src) {
- curr = new FragmentProto(fragment.fragmentId(), fragment.root(), fragment.rootSerialized());
- curr.root = visit(curr.root);
- fragments.add(curr.build());
- curr = null;
+ return new Fragment(src.fragmentId(), newRoot, remotes, src.serialized());
+ }
+ finally {
+ remotes = null;
}
-
- return fragments;
}
/** {@inheritDoc} */
@@ -163,7 +170,8 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(),
rel.exchangeId(), rel.sourceFragmentId());
- curr.remotes.add(receiver);
+ if (remotes != null)
+ remotes.add(receiver);
return receiver;
}
@@ -203,31 +211,4 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
@Override public IgniteRel visit(IgniteRel rel) {
return rel.accept(this);
}
-
- /** */
- private static class FragmentProto {
- /** */
- private final long id;
-
- /** */
- private IgniteRel root;
-
- /** Serialized representation. */
- private String rootSer;
-
- /** */
- private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
-
- /** */
- private FragmentProto(long id, IgniteRel root, String rootSer) {
- this.id = id;
- this.root = root;
- this.rootSer = rootSer;
- }
-
- /** */
- Fragment build() {
- return new Fragment(id, root, remotes.build(), rootSer);
- }
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java
index 6d0e3cb..b96a9ca 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java
@@ -17,9 +17,8 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
-
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.NotNull;
/**
* Query explain plan.
@@ -46,7 +45,7 @@ public class ExplainPlan implements QueryPlan {
}
/** {@inheritDoc} */
- @Override public QueryPlan clone(@Nullable PlanningContext ctx) {
+ @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
return this;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 98db988..04b57e2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -128,7 +128,7 @@ public class Fragment {
*
* @return Serialized form.
*/
- public String rootSerialized() {
+ public String serialized() {
return rootSer != null ? rootSer : (rootSer = toJson(root()));
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
similarity index 57%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
index 7f38050..dd28f96 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
@@ -17,23 +17,31 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.jetbrains.annotations.NotNull;
-/**
- *
- */
-public interface QueryPlan {
- /** Query type */
- enum Type { QUERY, DML, DDL, EXPLAIN }
+/** */
+public class FragmentPlan implements QueryPlan {
+ /** */
+ private final IgniteRel root;
+
+ /** */
+ public FragmentPlan(IgniteRel root) {
+ this.root = root;
+ }
+
+ /** */
+ public IgniteRel root() {
+ return root;
+ }
- /**
- * @return Query type.
- */
- Type type();
+ /** {@inheritDoc} */
+ @Override public Type type() {
+ return Type.FRAGMENT;
+ }
- /**
- * Clones this plan.
- * @param ctx Planner context.
- */
- QueryPlan clone(@Nullable PlanningContext ctx);
+ /** {@inheritDoc} */
+ @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
+ return new FragmentPlan(new Cloner(ctx.cluster()).visit(root));
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
index 1dbf3cb..0843557 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -26,31 +26,17 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
*/
-public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
+public class FragmentSplitter extends IgniteRelShuttle {
/** */
private final Deque<FragmentProto> stack = new LinkedList<>();
@@ -81,90 +67,7 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteFilter rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTrimExchange rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteProject rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTableModify rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteAggregate rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteMapAggregate rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteReduceAggregate rel) {
- assert cutPoint != rel;
-
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteUnionAll rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSort rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSender rel) {
- assert cutPoint != rel;
-
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteIndexScan rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTableScan rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteValues rel) {
- assert cutPoint != rel;
-
- return rel;
- }
-
- /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteReceiver rel) {
- assert cutPoint != rel;
-
curr.remotes.add(rel);
return rel;
@@ -183,7 +86,7 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
/**
* Visits all children of a parent.
*/
- private IgniteRel processNode(IgniteRel rel) {
+ @Override protected IgniteRel processNode(IgniteRel rel) {
if (rel == cutPoint) {
cutPoint = null;
@@ -198,16 +101,6 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
return rel;
}
- /**
- * Visits a particular child of a parent and replaces the child if it was changed.
- */
- private void visitChild(IgniteRel parent, int i, IgniteRel child) {
- IgniteRel newChild = visit(child);
-
- if (newChild != child)
- parent.replaceInput(i, newChild);
- }
-
/** */
private IgniteRel split(IgniteRel rel) {
RelOptCluster cluster = rel.getCluster();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
similarity index 70%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 4616756..5f235c2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -17,12 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.LinkedList;
import java.util.List;
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
@@ -44,30 +39,11 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-/**
- * Splits a query into a list of query fragments.
- */
-public class Splitter implements IgniteRelVisitor<IgniteRel> {
- /** */
- private final Deque<FragmentProto> stack = new LinkedList<>();
-
- /** */
- private FragmentProto curr;
-
- /** */
- public List<Fragment> go(IgniteRel root) {
- ArrayList<Fragment> res = new ArrayList<>();
-
- stack.push(new FragmentProto(Fragment.ID_GEN.getAndIncrement(), root));
-
- while (!stack.isEmpty()) {
- curr = stack.pop();
- curr.root = visit(curr.root);
- res.add(curr.build());
- curr = null;
- }
-
- return res;
+/** */
+public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSender rel) {
+ return processNode(rel);
}
/** {@inheritDoc} */
@@ -86,11 +62,6 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSort rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
return processNode(rel);
}
@@ -101,7 +72,7 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTableModify rel) {
+ @Override public IgniteRel visit(IgniteExchange rel) {
return processNode(rel);
}
@@ -121,33 +92,38 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableModify rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteUnionAll rel) {
return processNode(rel);
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSender rel) {
+ @Override public IgniteRel visit(IgniteSort rel) {
return processNode(rel);
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteIndexScan rel) {
- return rel;
+ return processNode(rel);
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteTableScan rel) {
- return rel;
+ return processNode(rel);
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteValues rel) {
- return rel;
+ @Override public IgniteRel visit(IgniteReceiver rel) {
+ return processNode(rel);
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteReceiver rel) {
- throw new AssertionError();
+ @Override public IgniteRel visit(IgniteValues rel) {
+ return processNode(rel);
}
/** {@inheritDoc} */
@@ -155,27 +131,10 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
return rel.accept(this);
}
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteExchange rel) {
- RelOptCluster cluster = rel.getCluster();
-
- long targetFragmentId = curr.id;
- long sourceFragmentId = Fragment.ID_GEN.getAndIncrement();
- long exchangeId = sourceFragmentId;
-
- IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(), exchangeId, sourceFragmentId);
- IgniteSender sender = new IgniteSender(cluster, rel.getTraitSet(), rel.getInput(), exchangeId, targetFragmentId, rel.distribution());
-
- curr.remotes.add(receiver);
- stack.push(new FragmentProto(sourceFragmentId, sender));
-
- return receiver;
- }
-
/**
* Visits all children of a parent.
*/
- private IgniteRel processNode(IgniteRel rel) {
+ protected IgniteRel processNode(IgniteRel rel) {
List<IgniteRel> inputs = Commons.cast(rel.getInputs());
for (int i = 0; i < inputs.size(); i++)
@@ -187,32 +146,10 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
/**
* Visits a particular child of a parent and replaces the child if it was changed.
*/
- private void visitChild(IgniteRel parent, int i, IgniteRel child) {
+ protected void visitChild(IgniteRel parent, int i, IgniteRel child) {
IgniteRel newChild = visit(child);
if (newChild != child)
parent.replaceInput(i, newChild);
}
-
- /** */
- private static class FragmentProto {
- /** */
- private final long id;
-
- /** */
- private IgniteRel root;
-
- /** */
- private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
-
- /** */
- private FragmentProto(long id, IgniteRel root) {
- this.id = id;
- this.root = root;
- }
-
- Fragment build() {
- return new Fragment(id, root, remotes.build());
- }
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
index 75cf46a..c36e608 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
@@ -18,9 +18,9 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
-
import com.google.common.collect.ImmutableList;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.jetbrains.annotations.NotNull;
/**
* Distributed dml plan.
@@ -38,7 +38,12 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan {
* @param fieldsMeta Fields metadata.
*/
public MultiStepDmlPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
- super(fragments, fieldsMeta);
+ this(fragments, fieldsMeta, new QueryMappings());
+ }
+
+ /** */
+ private MultiStepDmlPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings mappings) {
+ super(fragments, fieldsMeta, mappings);
}
/** {@inheritDoc} */
@@ -48,7 +53,7 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan {
/** {@inheritDoc}
* @param ctx*/
- @Override public QueryPlan clone(PlanningContext ctx) {
- return new MultiStepDmlPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMetadata());
+ @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
+ return new MultiStepDmlPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMeta, queryMappings);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
index 7b64c9b..5d88964 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
@@ -18,9 +18,9 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
-
import com.google.common.collect.ImmutableList;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.jetbrains.annotations.NotNull;
/**
* Distributed query plan.
@@ -39,7 +39,12 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan {
* @param fieldsMeta Fields metadata.
*/
public MultiStepQueryPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
- super(fragments, fieldsMeta);
+ this(fragments, fieldsMeta, new QueryMappings());
+ }
+
+ /** */
+ private MultiStepQueryPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings mappings) {
+ super(fragments, fieldsMeta, mappings);
}
/** {@inheritDoc} */
@@ -49,7 +54,7 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan {
/** {@inheritDoc}
* @param ctx*/
- @Override public QueryPlan clone(PlanningContext ctx) {
- return new MultiStepQueryPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMetadata());
+ @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
+ return new MultiStepQueryPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMeta, queryMappings);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java
similarity index 56%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java
index 47aa7c8..7c145c5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.ImmutableMap;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -31,99 +32,71 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPl
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- *
- */
-public abstract class AbstractMultiStepPlan implements MultiStepPlan {
- /** */
- protected final List<Fragment> fragments;
-
- /** */
- protected final List<GridQueryFieldMetadata> fieldsMeta;
+/** */
+public class QueryMappings {
/** */
- protected Map<Long, NodesMapping> mappings;
+ private static class Mapping {
+ /** */
+ private final AffinityTopologyVersion ver;
- /** */
- protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
- this.fragments = fragments;
- this.fieldsMeta = fieldsMeta;
- }
-
- /** {@inheritDoc} */
- @Override public List<Fragment> fragments() {
- return fragments;
- }
+ /** */
+ private final Map<Long, NodesMapping> nodeMappings;
- /** {@inheritDoc} */
- @Override public List<GridQueryFieldMetadata> fieldsMetadata() {
- return fieldsMeta;
- }
-
- /** {@inheritDoc} */
- @Override public NodesMapping fragmentMapping(Fragment fragment) {
- return fragmentMapping(fragment.fragmentId());
- }
-
- /** {@inheritDoc} */
- @Override public NodesMapping targetMapping(Fragment fragment) {
- if (fragment.local())
- return null;
-
- return fragmentMapping(((IgniteSender)fragment.root()).targetFragmentId());
+ /** */
+ private Mapping(AffinityTopologyVersion ver, Map<Long, NodesMapping> nodeMappings) {
+ this.ver = ver;
+ this.nodeMappings = nodeMappings;
+ }
}
- /** {@inheritDoc} */
- @Override public Map<Long, List<UUID>> remoteSources(Fragment fragment) {
- List<IgniteReceiver> remotes = fragment.remotes();
-
- if (F.isEmpty(remotes))
- return null;
+ /** */
+ private final AtomicReference<Mapping> mapping = new AtomicReference<>();
- HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());
+ /** */
+ public Map<Long, NodesMapping> map(MappingService mappingService, PlanningContext ctx, List<Fragment> fragments) {
+ Mapping mapping = this.mapping.get();
- for (IgniteReceiver remote : remotes)
- res.put(remote.exchangeId(), fragmentMapping(remote.sourceFragmentId()).nodes());
+ if (mapping != null && Objects.equals(mapping.ver, ctx.topologyVersion()))
+ return mapping.nodeMappings;
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void init(MappingService mappingService, PlanningContext ctx) {
- mappings = U.newHashMap(fragments.size());
+ ImmutableMap.Builder<Long, NodesMapping> b = ImmutableMap.builder();
RelMetadataQuery mq = F.first(fragments).root().getCluster().getMetadataQuery();
+ boolean save = true;
for (int i = 0, j = 0; i < fragments.size();) {
Fragment fragment = fragments.get(i);
try {
- mappings.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq));
+ b.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq));
i++;
}
catch (OptimisticPlanningException e) {
+ save = false; // we mustn't save mappings for mutated fragments
+
if (++j > 3)
throw new IgniteSQLException("Failed to map query.", e);
- replace(fragment, new FragmentSplitter(e.node()).go(fragment));
+ replace(fragments, fragment, new FragmentSplitter(e.node()).go(fragment));
// restart init routine.
- mappings.clear();
+ b = ImmutableMap.builder();
i = 0;
}
}
- }
- /** */
- private NodesMapping fragmentMapping(long fragmentId) {
- return mappings == null ? null : mappings.get(fragmentId);
+ ImmutableMap<Long, NodesMapping> mappings = b.build();
+
+ if (save)
+ this.mapping.compareAndSet(mapping, new Mapping(ctx.topologyVersion(), mappings));
+
+ return mappings;
}
/** */
- private void replace(Fragment fragment, List<Fragment> replacement) {
+ private void replace(List<Fragment> fragments, Fragment fragment, List<Fragment> replacement) {
assert !F.isEmpty(replacement);
Map<Long, Long> newTargets = new HashMap<>();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
index 7f38050..1c204f0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
@@ -17,14 +17,14 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.NotNull;
/**
*
*/
public interface QueryPlan {
/** Query type */
- enum Type { QUERY, DML, DDL, EXPLAIN }
+ enum Type { QUERY, FRAGMENT, DML, DDL, EXPLAIN }
/**
* @return Query type.
@@ -35,5 +35,5 @@ public interface QueryPlan {
* Clones this plan.
* @param ctx Planner context.
*/
- QueryPlan clone(@Nullable PlanningContext ctx);
+ QueryPlan clone(@NotNull PlanningContext ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index 4616756..464efa1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -23,31 +23,15 @@ import java.util.LinkedList;
import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Splits a query into a list of query fragments.
*/
-public class Splitter implements IgniteRelVisitor<IgniteRel> {
+public class Splitter extends IgniteRelShuttle {
/** */
private final Deque<FragmentProto> stack = new LinkedList<>();
@@ -71,91 +55,11 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteFilter rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTrimExchange rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteProject rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSort rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTableModify rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteAggregate rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteMapAggregate rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteReduceAggregate rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteUnionAll rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSender rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteIndexScan rel) {
- return rel;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTableScan rel) {
- return rel;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteValues rel) {
- return rel;
- }
-
- /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteReceiver rel) {
throw new AssertionError();
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteRel rel) {
- return rel.accept(this);
- }
-
- /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteExchange rel) {
RelOptCluster cluster = rel.getCluster();
@@ -172,28 +76,6 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
return receiver;
}
- /**
- * Visits all children of a parent.
- */
- private IgniteRel processNode(IgniteRel rel) {
- List<IgniteRel> inputs = Commons.cast(rel.getInputs());
-
- for (int i = 0; i < inputs.size(); i++)
- visitChild(rel, i, inputs.get(i));
-
- return rel;
- }
-
- /**
- * Visits a particular child of a parent and replaces the child if it was changed.
- */
- private void visitChild(IgniteRel parent, int i, IgniteRel child) {
- IgniteRel newChild = visit(child);
-
- if (newChild != child)
- parent.replaceInput(i, newChild);
- }
-
/** */
private static class FragmentProto {
/** */