You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/19 08:40:57 UTC
[ignite] branch sql-calcite updated: IGNITE-14281 Calcite.
Colocated tables are considered non colocated
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new faf2c5e IGNITE-14281 Calcite. Colocated tables are considered non colocated
faf2c5e is described below
commit faf2c5e319e2beccb5f0000d38476d1f59a4a646
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Fri Mar 19 11:40:36 2021 +0300
IGNITE-14281 Calcite. Colocated tables are considered non colocated
---
.../query/calcite/prepare/PlannerPhase.java | 5 +-
.../query/calcite/rel/AbstractIgniteJoin.java | 124 +++++----
.../query/calcite/rel/IgniteAggregateBase.java | 2 +-
.../query/calcite/rel/IgniteMergeJoin.java | 32 +--
.../calcite/rule/logical/FilterScanMergeRule.java | 9 +-
.../query/calcite/schema/TableDescriptorImpl.java | 37 ++-
.../query/calcite/trait/DistributionFunction.java | 7 +-
.../query/calcite/trait/DistributionTrait.java | 35 +--
.../query/calcite/trait/IgniteDistributions.java | 10 +
.../processors/query/calcite/trait/TraitUtils.java | 30 ++-
.../calcite/AbstractBasicIntegrationTest.java | 2 +-
.../query/calcite/CalciteQueryProcessorTest.java | 60 +++++
.../processors/query/calcite/QueryChecker.java | 1 +
.../query/calcite/planner/AbstractPlannerTest.java | 14 +
.../calcite/planner/JoinColocationPlannerTest.java | 288 +++++++++++++++++++++
.../apache/ignite/testsuites/PlannerTestSuite.java | 2 +
.../processors/query/GridQueryProcessor.java | 2 +-
.../processors/query/h2/opt/GridH2Table.java | 15 --
18 files changed, 514 insertions(+), 161 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 58d8667..6f21db7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -155,15 +155,16 @@ public enum PlannerPhase {
FilterScanMergeRule.INDEX_SCAN,
LogicalOrToUnionRule.INSTANCE,
+
CorrelatedNestedLoopJoinRule.INSTANCE,
+ NestedLoopJoinConverterRule.INSTANCE,
+ MergeJoinConverterRule.INSTANCE,
ValuesConverterRule.INSTANCE,
LogicalScanConverterRule.INDEX_SCAN,
LogicalScanConverterRule.TABLE_SCAN,
HashAggregateConverterRule.INSTANCE,
SortAggregateConverterRule.INSTANCE,
- MergeJoinConverterRule.INSTANCE,
- NestedLoopJoinConverterRule.INSTANCE,
ProjectConverterRule.INSTANCE,
FilterConverterRule.INSTANCE,
TableModifyConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
index cb54815..956ae76 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
@@ -18,9 +18,10 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableList;
@@ -38,8 +39,10 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -50,12 +53,11 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.rel.core.JoinRelType.INNER;
-import static org.apache.calcite.rel.core.JoinRelType.LEFT;
import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
import static org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdRowCount.joinRowCount;
import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
+import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
/** */
@@ -110,27 +112,23 @@ public abstract class AbstractIgniteJoin extends Join implements TraitsAwareIgni
RewindabilityTrait leftRewindability = TraitUtils.rewindability(left);
RewindabilityTrait rightRewindability = TraitUtils.rewindability(right);
- RelTraitSet outTraits, leftTraits, rightTraits;
+ List<Pair<RelTraitSet, List<RelTraitSet>>> pairs = new ArrayList<>();
- if (leftRewindability.rewindable() && rightRewindability.rewindable()) {
- outTraits = nodeTraits.replace(RewindabilityTrait.REWINDABLE);
- leftTraits = left.replace(RewindabilityTrait.REWINDABLE);
- rightTraits = right.replace(RewindabilityTrait.REWINDABLE);
- }
- else {
- outTraits = nodeTraits.replace(RewindabilityTrait.ONE_WAY);
- leftTraits = left.replace(RewindabilityTrait.ONE_WAY);
- rightTraits = right.replace(RewindabilityTrait.ONE_WAY);
- }
+ pairs.add(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+ ImmutableList.of(left.replace(RewindabilityTrait.ONE_WAY), right.replace(RewindabilityTrait.ONE_WAY))));
- return ImmutableList.of(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+ if (leftRewindability.rewindable() && rightRewindability.rewindable())
+ pairs.add(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE),
+ ImmutableList.of(left.replace(RewindabilityTrait.REWINDABLE), right.replace(RewindabilityTrait.REWINDABLE))));
+
+ return ImmutableList.copyOf(pairs);
}
/** {@inheritDoc} */
@Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Tere are several rules:
+ // There are several rules:
// 1) any join is possible on broadcast or single distribution
- // 2) hash distributed join is possible when join keys equal to source distribution keys
+ // 2) hash distributed join is possible when join keys are superset of source distribution keys
// 3) hash and broadcast distributed tables can be joined when join keys equal to hash
// distributed table distribution keys and:
// 3.1) it's a left join and a hash distributed table is at left
@@ -144,71 +142,55 @@ public abstract class AbstractIgniteJoin extends Join implements TraitsAwareIgni
IgniteDistribution leftDistr = TraitUtils.distribution(left);
IgniteDistribution rightDistr = TraitUtils.distribution(right);
- RelTraitSet outTraits, leftTraits, rightTraits;
+ IgniteDistribution left2rightProjectedDistr = leftDistr.apply(buildProjectionMapping(true));
+ IgniteDistribution right2leftProjectedDistr = rightDistr.apply(buildProjectionMapping(false));
+
+ RelTraitSet outTraits;
+ RelTraitSet leftTraits;
+ RelTraitSet rightTraits;
- if (leftDistr == broadcast() || rightDistr == broadcast()) {
+ if (leftDistr == broadcast() && rightDistr == broadcast()) {
outTraits = nodeTraits.replace(broadcast());
leftTraits = left.replace(broadcast());
rightTraits = right.replace(broadcast());
-
- res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
}
-
- if (leftDistr == single() || rightDistr == single()) {
+ else {
outTraits = nodeTraits.replace(single());
leftTraits = left.replace(single());
rightTraits = right.replace(single());
-
- res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
}
- if (!F.isEmpty(joinInfo.pairs())) {
- Set<DistributionFunction> functions = new HashSet<>();
+ res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
- if (leftDistr.getType() == HASH_DISTRIBUTED
- && Objects.equals(joinInfo.leftKeys, leftDistr.getKeys()))
- functions.add(leftDistr.function());
+ if (F.isEmpty(joinInfo.pairs()))
+ return ImmutableList.copyOf(res);
- if (rightDistr.getType() == HASH_DISTRIBUTED
- && Objects.equals(joinInfo.rightKeys, rightDistr.getKeys()))
- functions.add(rightDistr.function());
+ if (leftDistr.getType() == HASH_DISTRIBUTED && left2rightProjectedDistr != random()) {
+ outTraits = nodeTraits.replace(leftDistr);
+ leftTraits = left.replace(leftDistr);
+ rightTraits = right.replace(left2rightProjectedDistr);
- functions.add(DistributionFunction.hash());
-
- for (DistributionFunction function : functions) {
- leftTraits = left.replace(hash(joinInfo.leftKeys, function));
- rightTraits = right.replace(hash(joinInfo.rightKeys, function));
+ res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+ }
- // TODO distribution multitrait support
- outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, function));
- res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+ if (rightDistr.getType() == HASH_DISTRIBUTED && right2leftProjectedDistr != random()) {
+ outTraits = nodeTraits.replace(rightDistr);
+ leftTraits = left.replace(right2leftProjectedDistr);
+ rightTraits = right.replace(rightDistr);
- outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, function));
- res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+ res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+ }
- if (joinType == INNER || joinType == LEFT) {
- outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, function));
- leftTraits = left.replace(hash(joinInfo.leftKeys, function));
- rightTraits = right.replace(broadcast());
+ leftTraits = left.replace(hash(joinInfo.leftKeys, DistributionFunction.hash()));
+ rightTraits = right.replace(hash(joinInfo.rightKeys, DistributionFunction.hash()));
- res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
- }
+ outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, DistributionFunction.hash()));
+ res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
- if (joinType == INNER || joinType == RIGHT) {
- outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, function));
- leftTraits = left.replace(broadcast());
- rightTraits = right.replace(hash(joinInfo.rightKeys, function));
+ outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, DistributionFunction.hash()));
+ res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
- res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
- }
- }
- }
-
- if (!res.isEmpty())
- return res;
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(single()),
- ImmutableList.of(left.replace(single()), right.replace(single()))));
+ return ImmutableList.copyOf(res);
}
/** {@inheritDoc} */
@@ -316,4 +298,20 @@ public abstract class AbstractIgniteJoin extends Join implements TraitsAwareIgni
}
return true;
}
+
+ /** Creates mapping from left join keys to the right and vice versa with regards to {@code left2Right}. */
+ protected Mappings.TargetMapping buildProjectionMapping(boolean left2Right) {
+ ImmutableIntList sourceKeys = left2Right ? joinInfo.leftKeys : joinInfo.rightKeys;
+ ImmutableIntList targetKeys = left2Right ? joinInfo.rightKeys : joinInfo.leftKeys;
+
+ Map<Integer, Integer> keyMap = new HashMap<>();
+ for (int i = 0; i < joinInfo.leftKeys.size(); i++)
+ keyMap.put(sourceKeys.get(i), targetKeys.get(i));
+
+ return Mappings.target(
+ keyMap,
+ (left2Right ? left : right).getRowType().getFieldCount(),
+ (left2Right ? right : left).getRowType().getFieldCount()
+ );
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
index d2a40a4..32ef760 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
@@ -183,7 +183,7 @@ public abstract class IgniteAggregateBase extends IgniteAggregate implements Tra
//Check that group by contains all key columns
if (groupSet.cardinality() == keys.size()) {
- Mappings.TargetMapping mapping = Commons.inverseMapping(
+ Mappings.TargetMapping mapping = Commons.mapping(
groupSet, getInput().getRowType().getFieldCount());
IgniteDistribution outDistr = distribution.apply(mapping);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index ecddf42..38b6811 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -31,6 +31,7 @@ import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
@@ -137,36 +138,17 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
List<Integer> newLeftCollation, newRightCollation;
- if (isPrefix(leftCollation.getKeys(), joinInfo.leftKeys)) { // preserve left collation
- newLeftCollation = new ArrayList<>(leftCollation.getKeys());
+ if (isPrefix(leftCollation.getKeys(), joinInfo.leftKeys)) // preserve left collation
+ rightCollation = leftCollation.apply(buildProjectionMapping(true));
- Map<Integer, Integer> leftToRight = joinInfo.pairs().stream()
- .collect(Collectors.toMap(p -> p.source, p -> p.target));
+ else if (isPrefix(rightCollation.getKeys(), joinInfo.rightKeys))// preserve right collation
+ leftCollation = rightCollation.apply(buildProjectionMapping(false));
- newRightCollation = newLeftCollation.stream()
- .limit(joinInfo.pairs().size())
- .map(leftToRight::get)
- .collect(Collectors.toList());
- }
- else if (isPrefix(rightCollation.getKeys(), joinInfo.rightKeys)) { // preserve right collation
- newRightCollation = new ArrayList<>(rightCollation.getKeys());
-
- Map<Integer, Integer> rightToLeft = joinInfo.pairs().stream()
- .collect(Collectors.toMap(p -> p.target, p -> p.source));
-
- newLeftCollation = newRightCollation.stream()
- .limit(joinInfo.pairs().size())
- .map(rightToLeft::get)
- .collect(Collectors.toList());
- }
else { // generate new collations
- newLeftCollation = new ArrayList<>(joinInfo.leftKeys);
- newRightCollation = new ArrayList<>(joinInfo.rightKeys);
+ leftCollation = RelCollations.of(joinInfo.leftKeys);
+ rightCollation = RelCollations.of(joinInfo.rightKeys);
}
- leftCollation = createCollation(newLeftCollation);
- rightCollation = createCollation(newRightCollation);
-
return ImmutableList.of(
Pair.of(
nodeTraits.replace(leftCollation),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
index d3f033c..284d92d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
@@ -103,12 +103,6 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
}
/** {@inheritDoc} */
- @Override public boolean matches(RelOptRuleCall call) {
- T rel = call.rel(1);
- return rel.condition() == null;
- }
-
- /** {@inheritDoc} */
@Override public void onMatch(RelOptRuleCall call) {
LogicalFilter filter = call.rel(0);
T scan = call.rel(1);
@@ -119,6 +113,9 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
RexNode condition = filter.getCondition();
RexNode remaining = null;
+ if (scan.condition() != null)
+ condition = RexUtil.composeConjunction(builder, F.asList(scan.condition(), condition));
+
if (scan.projects() != null) {
IgniteTypeFactory typeFactory = Commons.typeFactory(scan);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index c14a488..2d3881c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
@@ -36,6 +38,7 @@ import org.apache.calcite.schema.ColumnStrategy;
import org.apache.calcite.sql2rel.InitializerContext;
import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -101,7 +104,7 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
private final int valField;
/** */
- private final int affField;
+ private final ImmutableIntList affFields;
/** */
private final ImmutableBitSet insertFields;
@@ -129,16 +132,9 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
int keyField = QueryUtils.KEY_COL;
int valField = QueryUtils.VAL_COL;
- int affField = QueryUtils.KEY_COL;
for (String field : fields) {
- if (Objects.equals(field, typeDesc.affinityKey()))
- affField = descriptors.size();
-
if (Objects.equals(field, typeDesc.keyFieldAlias())) {
- if (typeDesc.affinityKey() == null)
- affField = descriptors.size();
-
keyField = descriptors.size();
virtualFields.set(0);
@@ -165,12 +161,27 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
for (ColumnDescriptor descriptor : descriptors)
descriptorsMap.put(descriptor.name(), descriptor);
- if (TypeUtils.isConvertableType(descriptors.get(affField).storageType()))
- affField = -1;
+ List<Integer> affFields = new ArrayList<>();
+ if (!F.isEmpty(typeDesc.affinityKey()))
+ affFields.add(descriptorsMap.get(typeDesc.affinityKey()).fieldIndex());
+ else if (!F.isEmpty(typeDesc.keyFieldAlias()))
+ affFields.add(descriptorsMap.get(typeDesc.keyFieldAlias()).fieldIndex());
+ else {
+ affFields.addAll(
+ descriptors.stream()
+ .filter(desc -> typeDesc.primaryKeyFields().contains(desc.name()))
+ .map(ColumnDescriptor::fieldIndex)
+ .collect(Collectors.toList())
+ );
+ }
+
+ if (affFields.stream().map(descriptors::get).map(ColumnDescriptor::storageType)
+ .anyMatch(TypeUtils::isConvertableType))
+ affFields.clear();
this.keyField = keyField;
this.valField = valField;
- this.affField = affField;
+ this.affFields = ImmutableIntList.copyOf(affFields);
this.descriptors = descriptors.toArray(DUMMY);
this.descriptorsMap = descriptorsMap;
@@ -197,10 +208,10 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
@Override public IgniteDistribution distribution() {
if (affinityIdentity == null)
return IgniteDistributions.broadcast();
- else if (affField == -1)
+ else if (affFields.isEmpty())
return IgniteDistributions.random();
else
- return IgniteDistributions.affinity(affField, cacheInfo.cacheId(), affinityIdentity);
+ return IgniteDistributions.affinity(affFields, cacheInfo.cacheId(), affinityIdentity);
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index 6d79018..b1724a2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -141,11 +141,8 @@ public abstract class DistributionFunction {
if (f0 == f1 || f0.name() == f1.name())
return true;
- if (f0 instanceof AffinityDistribution && f1 instanceof AffinityDistribution &&
- Objects.equals(((AffinityDistribution)f0).identity(), ((AffinityDistribution)f1).identity()))
- return true;
-
- return false;
+ return f0 instanceof AffinityDistribution && f1 instanceof AffinityDistribution &&
+ Objects.equals(((AffinityDistribution)f0).identity(), ((AffinityDistribution)f1).identity());
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index fb5a9ee..7c1f4a0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.util.List;
import java.util.Objects;
import com.google.common.collect.Ordering;
import org.apache.calcite.plan.RelMultipleTrait;
@@ -24,6 +25,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
@@ -150,35 +152,14 @@ public final class DistributionTrait implements IgniteDistribution {
if (getType() != HASH_DISTRIBUTED)
return this;
- if (mapping.getTargetCount() < keys.size())
- return IgniteDistributions.random();
-
- int[] map = new int[mapping.getSourceCount()];
- int[] res = new int[keys.size()];
-
- for (int i = 0; i < keys.size(); i++)
- map[keys.getInt(i)] = i + 1;
-
- for (int i = 0, found = 0; i < mapping.getTargetCount(); i++) {
- int source = mapping.getSourceOpt(i);
-
- if (source == -1)
- continue;
-
- int keyPos = map[source] - 1;
-
- if (keyPos == -1)
- continue;
-
- res[keyPos] = i;
-
- if (++found == keys.size())
- return IgniteDistributions.hash(ImmutableIntList.of(res), function);
-
- map[source] = 0;
+ for (int key : keys) {
+ if (mapping.getTargetOpt(key) == -1)
+ return IgniteDistributions.random(); // Some distribution keys are not mapped => any.
}
- return IgniteDistributions.random();
+ List<Integer> res = Mappings.apply2((Mapping) mapping, keys);
+
+ return IgniteDistributions.hash(ImmutableIntList.copyOf(res), function);
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 0690124..8af167b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -87,6 +87,16 @@ public class IgniteDistributions {
}
/**
+ * @param keys Affinity keys.
+ * @param cacheId Affinity cache ID.
+ * @param identity Affinity identity key.
+ * @return Affinity distribution.
+ */
+ public static IgniteDistribution affinity(ImmutableIntList keys, int cacheId, Object identity) {
+ return hash(keys, DistributionFunction.affinity(cacheId, identity));
+ }
+
+ /**
* @param keys Distribution keys.
* @return Hash distribution.
*/
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 602cb78..6cd77f1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.processors.query.calcite.trait;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
@@ -43,6 +46,7 @@ import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ControlFlowException;
@@ -64,7 +68,6 @@ import static java.util.Collections.singletonList;
import static org.apache.calcite.plan.RelOptUtil.permutationPushDownProject;
import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.rel.core.Project.getPartialMapping;
import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.any;
import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
@@ -142,6 +145,11 @@ public class TraitUtils {
if (fromTrait.satisfies(toTrait))
return rel;
+ // right now we cannot create a multi-column affinity
+ // key object, thus this conversion is impossible
+ if (toTrait.function().affinity() && toTrait.getKeys().size() > 1)
+ return null;
+
RelTraitSet traits = rel.getTraitSet().replace(toTrait);
if (fromTrait.getType() == BROADCAST_DISTRIBUTED && toTrait.getType() == HASH_DISTRIBUTED)
return new IgniteTrimExchange(rel.getCluster(), traits, rel, toTrait);
@@ -365,7 +373,7 @@ public class TraitUtils {
if (distribution.getType() != HASH_DISTRIBUTED)
return distribution;
- Mappings.TargetMapping mapping = getPartialMapping(inputRowType.getFieldCount(), projects);
+ Mappings.TargetMapping mapping = createProjectionMapping(inputRowType.getFieldCount(), projects);
return distribution.apply(mapping);
}
@@ -489,6 +497,24 @@ public class TraitUtils {
);
}
+ /**
+ * Creates mapping from provided projects that maps a source column idx
+ * to idx in a row after applying projections.
+ *
+ * @param inputFieldCount Size of a source row.
+ * @param projects Projections.
+ */
+ private static Mappings.TargetMapping createProjectionMapping(int inputFieldCount, List<? extends RexNode> projects) {
+ Map<Integer, Integer> src2target = new HashMap<>();
+
+ for (Ord<RexNode> exp : Ord.<RexNode>zip(projects)) {
+ if (exp.e instanceof RexInputRef)
+ src2target.putIfAbsent(((RexInputRef) exp.e).getIndex(), exp.i);
+ }
+
+ return Mappings.target(src -> src2target.getOrDefault(src, -1), inputFieldCount, projects.size());
+ }
+
/** */
private static class PropagationContext {
/** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
index ceec400..29f7786 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
@@ -42,7 +42,7 @@ public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
startGrids(3);
- client = startClientGrid();
+ client = startClientGrid("client");
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index ac1b6ca0..c1a1062 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
@@ -38,9 +40,14 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -789,6 +796,59 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertNull(row);
}
+ /**
+ * Test verifies that table has a distribution function over valid keys.
+ */
+ @Test
+ public void testTableDistributionKeysForComplexKeyObject() {
+ {
+ class MyKey {
+ int id1;
+
+ int id2;
+ }
+
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ fields.put("ID1", Integer.class.getName());
+ fields.put("ID2", Integer.class.getName());
+ fields.put("VAL", String.class.getName());
+
+ client.getOrCreateCache(new CacheConfiguration<MyKey, String>("test_cache_1")
+ .setSqlSchema("PUBLIC")
+ .setQueryEntities(F.asList(
+ new QueryEntity(MyKey.class.getName(), String.class.getName())
+ .setTableName("MY_TBL_1")
+ .setFields(fields)
+ .setKeyFields(ImmutableSet.of("ID1", "ID2"))
+ .setValueFieldName("VAL")
+ ))
+ .setBackups(2)
+ );
+ }
+
+ {
+ client.getOrCreateCache(new CacheConfiguration<Key, String>("test_cache_2")
+ .setSqlSchema("PUBLIC")
+ .setQueryEntities(F.asList(
+ new QueryEntity(Key.class, String.class)
+ .setTableName("MY_TBL_2")
+ ))
+ .setBackups(2)
+ );
+ }
+
+ CalciteQueryProcessor qryProc = Commons.lookupComponent(client.context(), CalciteQueryProcessor.class);
+
+ Map<String, IgniteSchema> schemas = GridTestUtils.getFieldValue(qryProc, "schemaHolder", "igniteSchemas");
+
+ IgniteSchema pub = schemas.get("PUBLIC");
+
+ Map<String, IgniteTable> tblMap = GridTestUtils.getFieldValue(pub, "tblMap");
+
+ assertEquals(ImmutableIntList.of(2, 3), tblMap.get("MY_TBL_1").descriptor().distribution().getKeys());
+ assertEquals(ImmutableIntList.of(3), tblMap.get("MY_TBL_2").descriptor().distribution().getKeys());
+ }
+
/** for test purpose only. */
public void testThroughput() {
IgniteCache<Integer, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index 56f880e..c50b1ea 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.util.typedef.F;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 4ec8f16..d172fad 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -437,6 +437,9 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
/** */
abstract static class TestTable implements IgniteTable {
/** */
+ private final String name;
+
+ /** */
private final RelProtoDataType protoType;
/** */
@@ -460,9 +463,15 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
/** */
TestTable(RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
+ this(UUID.randomUUID().toString(), type, rewindable, rowCnt);
+ }
+
+ /** */
+ TestTable(String name, RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
protoType = RelDataTypeImpl.proto(type);
this.rewindable = rewindable;
this.rowCnt = rowCnt;
+ this.name = name;
}
/** {@inheritDoc} */
@@ -604,6 +613,11 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
@Override public void removeIndex(String idxName) {
throw new AssertionError();
}
+
+ /** */
+ public String name() {
+ return name;
+ }
}
/** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
new file mode 100644
index 0000000..e378766
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.hamcrest.CustomMatcher;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test suite to verify join colocation.
+ */
+public class JoinColocationPlannerTest extends AbstractPlannerTest {
+ /** */
+ private static final IgniteTypeFactory TYPE_FACTORY = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ /** */
+ private static final int DEFAULT_TBL_SIZE = 500_000;
+
+ /**
+ * Join of the same tables with a simple affinity is expected to be colocated.
+ */
+ @Test
+ public void joinSameTableSimpleAff() throws Exception {
+ TestTable tbl = createTable(
+ "TEST_TBL",
+ IgniteDistributions.affinity(0, "default", "hash"),
+ "ID", Integer.class,
+ "VAL", String.class
+ );
+
+ tbl.addIndex(new IgniteIndex(RelCollations.of(0), "PK", null, tbl));
+
+ IgniteSchema schema = createSchema(tbl);
+
+ String sql = "select count(*) " +
+ "from TEST_TBL t1 " +
+ "join TEST_TBL t2 on t1.id = t2.id";
+
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+ IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, join, notNullValue());
+ assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+ assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+ assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+ }
+
+ /**
+ * Join of the same tables with a complex affinity is expected to be colocated.
+ */
+ @Test
+ public void joinSameTableComplexAff() throws Exception {
+ TestTable tbl = createTable(
+ "TEST_TBL",
+ IgniteDistributions.affinity(ImmutableIntList.of(0, 1), CU.cacheId("default"), "hash"),
+ "ID1", Integer.class,
+ "ID2", Integer.class,
+ "VAL", String.class
+ );
+
+ tbl.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, tbl));
+
+ IgniteSchema schema = createSchema(tbl);
+
+ String sql = "select count(*) " +
+ "from TEST_TBL t1 " +
+ "join TEST_TBL t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
+
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+ IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, join, notNullValue());
+ assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+ assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+ assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+ }
+
+ /**
+ * Re-hashing based on simple affinity is possible, so bigger table with complex affinity
+ * should be sended to the smaller one.
+ */
+ @Test
+ public void joinComplexToSimpleAff() throws Exception {
+ TestTable complexTbl = createTable(
+ "COMPLEX_TBL",
+ 2 * DEFAULT_TBL_SIZE,
+ IgniteDistributions.affinity(ImmutableIntList.of(0, 1), CU.cacheId("default"), "hash"),
+ "ID1", Integer.class,
+ "ID2", Integer.class,
+ "VAL", String.class
+ );
+
+ complexTbl.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, complexTbl));
+
+ TestTable simpleTbl = createTable(
+ "SIMPLE_TBL",
+ DEFAULT_TBL_SIZE,
+ IgniteDistributions.affinity(0, "default", "hash"),
+ "ID", Integer.class,
+ "VAL", String.class
+ );
+
+ simpleTbl.addIndex(new IgniteIndex(RelCollations.of(0), "PK", null, simpleTbl));
+
+ IgniteSchema schema = createSchema(complexTbl, simpleTbl);
+
+ String sql = "select count(*) " +
+ "from COMPLEX_TBL t1 " +
+ "join SIMPLE_TBL t2 on t1.id1 = t2.id";
+
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+ IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, join, notNullValue());
+ assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+
+ List<IgniteExchange> exchanges = findNodes(phys, node -> node instanceof IgniteExchange
+ && ((IgniteRel)node).distribution().function().affinity());
+
+ assertThat(invalidPlanMsg, exchanges, hasSize(1));
+ assertThat(invalidPlanMsg, exchanges.get(0).getInput(0), instanceOf(IgniteIndexScan.class));
+ assertThat(invalidPlanMsg, exchanges.get(0).getInput(0)
+ .getTable().unwrap(TestTable.class), equalTo(complexTbl));
+ }
+
+ /**
+ * Re-hashing for complex affinity is not supported.
+ */
+ @Test
+ public void joinComplexToComplexAffWithDifferentOrder() throws Exception {
+ TestTable complexTblDirect = createTable(
+ "COMPLEX_TBL_DIRECT",
+ IgniteDistributions.affinity(ImmutableIntList.of(0, 1), CU.cacheId("default"), "hash"),
+ "ID1", Integer.class,
+ "ID2", Integer.class,
+ "VAL", String.class
+ );
+
+ complexTblDirect.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, complexTblDirect));
+
+ TestTable complexTblIndirect = createTable(
+ "COMPLEX_TBL_INDIRECT",
+ IgniteDistributions.affinity(ImmutableIntList.of(1, 0), CU.cacheId("default"), "hash"),
+ "ID1", Integer.class,
+ "ID2", Integer.class,
+ "VAL", String.class
+ );
+
+ complexTblIndirect.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, complexTblIndirect));
+
+ IgniteSchema schema = createSchema(complexTblDirect, complexTblIndirect);
+
+ String sql = "select count(*) " +
+ "from COMPLEX_TBL_DIRECT t1 " +
+ "join COMPLEX_TBL_INDIRECT t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
+
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+ IgniteMergeJoin exchange = findFirstNode(phys, node -> node instanceof IgniteExchange
+ && ((IgniteRel)node).distribution().function().affinity());
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, exchange, nullValue());
+ }
+
+ /**
+ * Creates test table with given params.
+ *
+ * @param name Name of the table.
+ * @param distr Distribution of the table.
+ * @param fields List of the required fields. Every odd item should be a string
+ * representing a column name, every even item should be a class representing column's type.
+ * E.g. {@code createTable("MY_TABLE", distribution, "ID", Integer.class, "VAL", String.class)}.
+ * @return Instance of the {@link TestTable}.
+ */
+ private static TestTable createTable(String name, IgniteDistribution distr, Object... fields) {
+ return createTable(name, DEFAULT_TBL_SIZE, distr, fields);
+ }
+
+ /**
+ * Creates test table with given params.
+ *
+ * @param name Name of the table.
+ * @param size Required size of the table.
+ * @param distr Distribution of the table.
+ * @param fields List of the required fields. Every odd item should be a string
+ * representing a column name, every even item should be a class representing column's type.
+ * E.g. {@code createTable("MY_TABLE", 500, distribution, "ID", Integer.class, "VAL", String.class)}.
+ * @return Instance of the {@link TestTable}.
+ */
+ private static TestTable createTable(String name, int size, IgniteDistribution distr, Object... fields) {
+ if (F.isEmpty(fields) || fields.length % 2 != 0)
+ throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
+
+ RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(TYPE_FACTORY);
+
+ for (int i = 0; i < fields.length; i += 2)
+ b.add((String)fields[i], TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]));
+
+ return new TestTable(name, b.build(), RewindabilityTrait.REWINDABLE, size) {
+ @Override public IgniteDistribution distribution() {
+ return distr;
+ }
+ };
+ }
+
+ /**
+ * Creates public schema from provided tables.
+ *
+ * @param tbls Tables to create schema for.
+ * @return Public schema.
+ */
+ private static IgniteSchema createSchema(TestTable... tbls) {
+ IgniteSchema schema = new IgniteSchema("PUBLIC");
+
+ for (TestTable tbl : tbls)
+ schema.addTable(tbl.name(), tbl);
+
+ return schema;
+ }
+
+ /**
+ * Matcher to verify size of the collection.
+ *
+ * @param size Required size.
+ * @return {@code true} in case collection is not null and has an exactly the same size.
+ */
+ private static <T extends Collection<?>> Matcher<T> hasSize(int size) {
+ return new CustomMatcher<T>("should be non empty with size=" + size) {
+ @Override public boolean matches(Object item) {
+ return item instanceof Collection && ((Collection<?>)item).size() == size;
+ }
+ };
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 4c7f024..c6cb5ba 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlan
import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
@@ -39,6 +40,7 @@ import org.junit.runners.Suite;
AggregatePlannerTest.class,
HashAggregatePlannerTest.class,
SortAggregatePlannerTest.class,
+ JoinColocationPlannerTest.class,
})
public class PlannerTestSuite {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 59f9488..476495e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -168,7 +168,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Patter to test incoming query to decide whether this query should be executed with Calcite or H2. */
private static final Pattern IS_SELECT_OR_EXPLAIN_PATTERN =
- Pattern.compile("^\\s*(select|explain plan for)", CASE_INSENSITIVE);
+ Pattern.compile("^\\s*(select|explain plan)", CASE_INSENSITIVE);
/** For tests. */
public static Class<? extends GridQueryIndexing> idxCls;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 39b6e6f..c2b554e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -1293,20 +1292,6 @@ public class GridH2Table extends TableBase {
* Refreshes table stats if they are possibly outdated, must be called only in client mode.
*/
private void refreshStatsIfNeededEx() {
- boolean client = cacheInfo.cacheContext().kernalContext().clientNode();
-
- GridQueryProcessor qryProc = cacheInfo.cacheContext().kernalContext().query();
- boolean experimental = qryProc.useExperimentalEngine();
-
- assert experimental;
-
- if (!client) {
- refreshStatsIfNeeded();
-
- return;
- }
-
- // Update stats if total table size changed significantly since the last stats update.
if (cliReqCnt.getAndIncrement() % STATS_CLI_UPDATE_THRESHOLD == 0) {
TableStatistics stats = tblStats;