You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/12/12 11:22:04 UTC
[ignite-3] branch main updated: IGNITE-18224 Sql. Allow hash output distribution for SET relations (#1416)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a5b885e84f IGNITE-18224 Sql. Allow hash output distribution for SET relations (#1416)
a5b885e84f is described below
commit a5b885e84f6ac2c2bcc06a1658b3b94445ef78c9
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Mon Dec 12 14:21:58 2022 +0300
IGNITE-18224 Sql. Allow hash output distribution for SET relations (#1416)
---
.../ignite/internal/sql/engine/ItSetOpTest.java | 34 +++
.../internal/sql/engine/prepare/PlannerPhase.java | 4 +-
...ntersect.java => IgniteColocatedIntersect.java} | 19 +-
...eSingleMinus.java => IgniteColocatedMinus.java} | 19 +-
...eSingleSetOp.java => IgniteColocatedSetOp.java} | 50 +++-
.../internal/sql/engine/rel/set/IgniteSetOp.java | 2 +-
.../sql/engine/rule/MinusConverterRule.java | 4 +-
.../sql/engine/rule/SetOpConverterRule.java | 28 +-
.../sql/engine/planner/SetOpPlannerTest.java | 290 ++++++++++++++-------
9 files changed, 317 insertions(+), 133 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
index 250759c7cb..c8aa0c2809 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -185,6 +186,39 @@ public class ItSetOpTest extends AbstractBasicIntegrationTest {
assertEquals(2, countIf(rows, r -> r.get(0).equals("Igor1")));
}
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18211")
+ public void testSetOpColocated() {
+ sql("CREATE TABLE emp(empid INTEGER, deptid INTEGER, name VARCHAR, PRIMARY KEY(empid, deptid)) COLOCATE BY (deptid)");
+ sql("CREATE TABLE dept(deptid INTEGER, name VARCHAR, PRIMARY KEY(deptid))");
+
+ sql("INSERT INTO emp VALUES (0, 0, 'test0'), (1, 0, 'test1'), (2, 1, 'test2')");
+ sql("INSERT INTO dept VALUES (0, 'test0'), (1, 'test1'), (2, 'test2')");
+
+ assertQuery("SELECT deptid, name FROM emp EXCEPT SELECT deptid, name FROM dept")
+ .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedMinus.*"))
+ .returns(0, "test1")
+ .returns(1, "test2")
+ .check();
+
+ assertQuery("SELECT deptid, name FROM dept EXCEPT SELECT deptid, name FROM emp")
+ .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedMinus.*"))
+ .returns(1, "test1")
+ .returns(2, "test2")
+ .check();
+
+ assertQuery("SELECT deptid FROM dept EXCEPT SELECT deptid FROM emp")
+ .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedMinus.*"))
+ .returns(2)
+ .check();
+
+ assertQuery("SELECT deptid FROM dept INTERSECT SELECT deptid FROM emp")
+ .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedIntersect.*"))
+ .returns(0)
+ .returns(1)
+ .check();
+ }
+
/**
* Test that set op node can be rewinded.
*/
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 9d9abcbe9d..a388a4e388 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -212,9 +212,9 @@ public enum PlannerPhase {
HashAggregateConverterRule.MAP_REDUCE,
SortAggregateConverterRule.COLOCATED,
SortAggregateConverterRule.MAP_REDUCE,
- SetOpConverterRule.SINGLE_MINUS,
+ SetOpConverterRule.COLOCATED_MINUS,
SetOpConverterRule.MAP_REDUCE_MINUS,
- SetOpConverterRule.SINGLE_INTERSECT,
+ SetOpConverterRule.COLOCATED_INTERSECT,
SetOpConverterRule.MAP_REDUCE_INTERSECT,
ProjectConverterRule.INSTANCE,
FilterConverterRule.INSTANCE,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleIntersect.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedIntersect.java
similarity index 75%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleIntersect.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedIntersect.java
index 63918de705..87a741aee9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleIntersect.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedIntersect.java
@@ -27,10 +27,10 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRelVisitor;
import org.apache.ignite.internal.sql.engine.util.Commons;
/**
- * Physical node for INTERSECT operator which inputs satisfy SINGLE distribution.
+ * Physical node for INTERSECT operator which inputs are colocated.
*/
-public class IgniteSingleIntersect extends IgniteIntersect implements IgniteSingleSetOp {
- public IgniteSingleIntersect(
+public class IgniteColocatedIntersect extends IgniteIntersect implements IgniteColocatedSetOp {
+ public IgniteColocatedIntersect(
RelOptCluster cluster,
RelTraitSet traitSet,
List<RelNode> inputs,
@@ -40,23 +40,24 @@ public class IgniteSingleIntersect extends IgniteIntersect implements IgniteSing
}
/**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Constructor used for deserialization.
+ *
+ * @param input Serialized representation.
*/
- public IgniteSingleIntersect(RelInput input) {
+ public IgniteColocatedIntersect(RelInput input) {
super(input);
}
/** {@inheritDoc} */
@Override
- public IgniteSingleIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new IgniteSingleIntersect(getCluster(), traitSet, inputs, all);
+ public IgniteColocatedIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new IgniteColocatedIntersect(getCluster(), traitSet, inputs, all);
}
/** {@inheritDoc} */
@Override
public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteSingleIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
+ return new IgniteColocatedIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleMinus.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedMinus.java
similarity index 76%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleMinus.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedMinus.java
index 49ade1db0d..0b23c00ad9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleMinus.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedMinus.java
@@ -27,10 +27,10 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRelVisitor;
import org.apache.ignite.internal.sql.engine.util.Commons;
/**
- * Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
+ * Physical node for MINUS (EXCEPT) operator which inputs are colocated.
*/
-public class IgniteSingleMinus extends IgniteMinus implements IgniteSingleSetOp {
- public IgniteSingleMinus(
+public class IgniteColocatedMinus extends IgniteMinus implements IgniteColocatedSetOp {
+ public IgniteColocatedMinus(
RelOptCluster cluster,
RelTraitSet traitSet,
List<RelNode> inputs,
@@ -40,23 +40,24 @@ public class IgniteSingleMinus extends IgniteMinus implements IgniteSingleSetOp
}
/**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Constructor used for deserialization.
+ *
+ * @param input Serialized representation.
*/
- public IgniteSingleMinus(RelInput input) {
+ public IgniteColocatedMinus(RelInput input) {
super(input);
}
/** {@inheritDoc} */
@Override
- public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
+ public IgniteColocatedMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new IgniteColocatedMinus(getCluster(), traitSet, inputs, all);
}
/** {@inheritDoc} */
@Override
public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteSingleMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
+ return new IgniteColocatedMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleSetOp.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedSetOp.java
similarity index 65%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleSetOp.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedSetOp.java
index 499d4bca12..3e8ca24190 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleSetOp.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedSetOp.java
@@ -17,23 +17,26 @@
package org.apache.ignite.internal.sql.engine.rel.set;
+import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.util.Commons;
/**
- * Physical node for set op (MINUS, INTERSECT) operator which inputs satisfy SINGLE distribution.
+ * Physical node for set op (MINUS, INTERSECT) operator which inputs are colocated.
*/
-public interface IgniteSingleSetOp extends IgniteSetOp {
+public interface IgniteColocatedSetOp extends IgniteSetOp {
/** {@inheritDoc} */
@Override
public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
@@ -69,15 +72,46 @@ public interface IgniteSingleSetOp extends IgniteSetOp {
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
) {
- boolean single = inputTraits.stream()
- .map(TraitUtils::distribution)
- .allMatch(d -> d.satisfies(IgniteDistributions.single()));
+ boolean haveSingle = false;
+ IgniteDistribution hashDistribution = null;
- if (!single) {
- return List.of();
+ for (RelTraitSet traits : inputTraits) {
+ IgniteDistribution distribution = TraitUtils.distribution(traits);
+
+ if (distribution == IgniteDistributions.single()) {
+ if (hashDistribution != null) { // Single incompatible with hash.
+ return ImmutableList.of();
+ }
+
+ haveSingle = true;
+ } else if (distribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) {
+ if (haveSingle) { // Hash incompatible with single.
+ return ImmutableList.of();
+ }
+
+ if (hashDistribution == null) {
+ hashDistribution = distribution;
+ } else if (!hashDistribution.satisfies(distribution)) {
+ return ImmutableList.of();
+ }
+
+ } else if (distribution != IgniteDistributions.broadcast()) {
+ return ImmutableList.of();
+ }
}
- return List.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
+ assert hashDistribution == null || !haveSingle;
+
+ if (haveSingle) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
+ } else if (hashDistribution != null) {
+ IgniteDistribution distribution = hashDistribution;
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(distribution),
+ Commons.transform(inputTraits, t -> t.replace(distribution))));
+ } else {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.broadcast()), inputTraits));
+ }
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java
index 576861a6e3..0a37b1d10f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java
@@ -71,7 +71,7 @@ public interface IgniteSetOp extends TraitsAwareIgniteRel {
double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
- return costFactory.makeCost(inputRows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+ return costFactory.makeCost(inputRows, inputRows * IgniteCost.HASH_LOOKUP_COST, 0, mem, 0);
}
/** Aggregate type. */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java
index aa0274ee2f..ecc5d1b7e7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java
@@ -28,9 +28,9 @@ import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedMinus;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapMinus;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleMinus;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
/**
@@ -58,7 +58,7 @@ public class MinusConverterRule {
RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
- return new IgniteSingleMinus(cluster, outTrait, inputs, setOp.all);
+ return new IgniteColocatedMinus(cluster, outTrait, inputs, setOp.all);
}
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java
index 916972d9b9..8a9ffdf5aa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java
@@ -31,21 +31,21 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedIntersect;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedMinus;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapIntersect;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapMinus;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceIntersect;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleIntersect;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleMinus;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
/**
* Set op (MINUS, INTERSECT) converter rule.
*/
public class SetOpConverterRule {
- public static final RelOptRule SINGLE_MINUS = new SingleMinusConverterRule();
+ public static final RelOptRule COLOCATED_MINUS = new ColocatedMinusConverterRule();
- public static final RelOptRule SINGLE_INTERSECT = new SingleIntersectConverterRule();
+ public static final RelOptRule COLOCATED_INTERSECT = new ColocatedIntersectConverterRule();
public static final RelOptRule MAP_REDUCE_MINUS = new MapReduceMinusConverterRule();
@@ -55,8 +55,8 @@ public class SetOpConverterRule {
// No-op.
}
- private abstract static class SingleSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
- SingleSetOpConverterRule(Class<T> cls, String desc) {
+ private abstract static class ColocatedSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
+ ColocatedSetOpConverterRule(Class<T> cls, String desc) {
super(cls, desc);
}
@@ -75,29 +75,29 @@ public class SetOpConverterRule {
}
}
- private static class SingleMinusConverterRule extends SingleSetOpConverterRule<LogicalMinus> {
- SingleMinusConverterRule() {
- super(LogicalMinus.class, "SingleMinusConverterRule");
+ private static class ColocatedMinusConverterRule extends ColocatedSetOpConverterRule<LogicalMinus> {
+ ColocatedMinusConverterRule() {
+ super(LogicalMinus.class, "ColocatedMinusConverterRule");
}
/** {@inheritDoc} */
@Override
PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
boolean all) {
- return new IgniteSingleMinus(cluster, traits, inputs, all);
+ return new IgniteColocatedMinus(cluster, traits, inputs, all);
}
}
- private static class SingleIntersectConverterRule extends SingleSetOpConverterRule<LogicalIntersect> {
- SingleIntersectConverterRule() {
- super(LogicalIntersect.class, "SingleIntersectConverterRule");
+ private static class ColocatedIntersectConverterRule extends ColocatedSetOpConverterRule<LogicalIntersect> {
+ ColocatedIntersectConverterRule() {
+ super(LogicalIntersect.class, "ColocatedIntersectConverterRule");
}
/** {@inheritDoc} */
@Override
PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
boolean all) {
- return new IgniteSingleIntersect(cluster, traits, inputs, all);
+ return new IgniteColocatedIntersect(cluster, traits, inputs, all);
}
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
index be2071053a..1970557e53 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
@@ -17,8 +17,15 @@
package org.apache.ignite.internal.sql.engine.planner;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution.Type;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedIntersect;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedMinus;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedSetOp;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapIntersect;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapMinus;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapSetOp;
@@ -26,14 +33,12 @@ import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceIntersect;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceMinus;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceSetOp;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleIntersect;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleMinus;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleSetOp;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -48,7 +53,8 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
/**
* Setup.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ *
+ * <p>Prepares multiple test tables with different distributions.
*/
@BeforeAll
public void setup() {
@@ -70,15 +76,28 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
createTable(publicSchema, "SINGLE_TBL2", type, IgniteDistributions.single());
createTable(publicSchema, "AFFINITY_TBL1", type,
- IgniteDistributions.affinity(0, "Test1", "hash"));
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+ // IgniteDistributions.affinity(0, "Test1", "hash"));
+ IgniteDistributions.hash(List.of(0)));
+
createTable(publicSchema, "AFFINITY_TBL2", type,
- IgniteDistributions.affinity(0, "Test2", "hash"));
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+ // IgniteDistributions.affinity(0, "Test2", "hash"));
+ IgniteDistributions.hash(List.of(0)));
+
+ createTable(publicSchema, "AFFINITY_TBL3", type,
+ IgniteDistributions.affinity(1, "Test3", "hash"));
+
+ createTable(publicSchema, "AFFINITY_TBL4", type,
+ IgniteDistributions.affinity(0, "Test4", "hash2"));
}
/**
- * TestSetOpRandom.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations on two tables with random distribution.
+ *
+ * <p>{@link Type#RANDOM_DISTRIBUTED Random} distribution cannot be colocated
+ * with other random distribution.
*
* @throws Exception If failed.
*/
@@ -98,8 +117,10 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
}
/**
- * TestSetOpAllRandom.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations (with ALL flag enabled) on two tables with random distribution.
+ *
+ * <p>{@link Type#RANDOM_DISTRIBUTED Random} distribution cannot be colocated
+ * with other random distribution.
*
* @throws Exception If failed.
*/
@@ -114,13 +135,14 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
.and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("random_tbl1")))
.and(input(1, isTableScan("random_tbl2")))
- )),
- "SingleIntersectConverterRule");
+ )));
}
/**
- * TestSetOpBroadcast.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations on two tables with broadcast distribution.
+ *
+ * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+ * distribution satisfies any other distribution.
*
* @throws Exception If failed.
*/
@@ -131,15 +153,17 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ setOp(setOp)
+ "SELECT * FROM broadcast_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
.and(input(0, isTableScan("broadcast_tbl1")))
.and(input(1, isTableScan("broadcast_tbl2")))
);
}
/**
- * TestSetOpSingle.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations on two tables with single distribution.
+ *
+ * <p>The operation is considered colocated because {@link Type#SINGLETON single} distribution
+ * satisfies other single distribution.
*
* @throws Exception If failed.
*/
@@ -150,14 +174,16 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ setOp(setOp)
+ "SELECT * FROM single_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, isTableScan("single_tbl2"))));
}
/**
- * TestSetOpSingleAndRandom.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations on two tables with single and random distribution.
+ *
+ * <p>{@link Type#SINGLETON Single} distribution cannot be colocated
+ * with {@link Type#RANDOM_DISTRIBUTED random} distribution.
*
* @throws Exception If failed.
*/
@@ -168,14 +194,16 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ setOp(setOp)
+ "SELECT * FROM random_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+ .and(hasDistribution(IgniteDistributions.single()))
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, hasChildThat(isTableScan("random_tbl1")))));
}
/**
- * TestSetOpSingleAndAffinity.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations on two tables with single and affinity distribution.
+ *
+ * <p>{@link Type#SINGLETON Single} distribution cannot be colocated with affinity distribution.
*
* @throws Exception If failed.
*/
@@ -186,14 +214,17 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ setOp(setOp)
+ "SELECT * FROM affinity_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+ .and(hasDistribution(IgniteDistributions.single()))
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, hasChildThat(isTableScan("affinity_tbl1")))));
}
/**
- * TestSetOpSingleAndBroadcast.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations on two tables with single and broadcast distribution.
+ *
+ * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+ * distribution satisfies any other distribution.
*
* @throws Exception If failed.
*/
@@ -204,15 +235,18 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ setOp(setOp)
+ "SELECT * FROM broadcast_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, isTableScan("broadcast_tbl1")))
);
}
+
/**
- * TestSetOpAffinity.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests SET operations on tables with the same affinity distribution.
+ *
+ * <p>The operation is considered colocated because the tables are
+ * compared against the corresponding collocation columns.
*
* @throws Exception If failed.
*/
@@ -223,17 +257,112 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ setOp(setOp)
+ "SELECT * FROM affinity_tbl2 ";
+ assertPlan(sql, publicSchema, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(setOp.colocated)
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+ // .and(hasDistribution(IgniteDistributions.affinity(0, null, "hash")))
+ .and(input(0, isTableScan("affinity_tbl1")))
+ .and(input(1, isTableScan("affinity_tbl2")))
+ ))
+ );
+ }
+
+ /**
+ * Tests SET operations on two tables with affinity and broadcast distribution.
+ *
+ * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+ * distribution satisfies any other distribution.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @EnumSource
+ public void testSetOpAffinityAndBroadcast(SetOp setOp) throws Exception {
+ String sql = "SELECT * FROM affinity_tbl1 "
+ + setOp(setOp)
+ + "SELECT * FROM broadcast_tbl1 ";
+
+ assertPlan(sql, publicSchema, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(setOp.colocated)
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+ // .and(hasDistribution(IgniteDistributions.affinity(0, null, "hash")))
+ .and(input(0, isTableScan("affinity_tbl1")))
+ .and(input(1, isInstanceOf(IgniteTrimExchange.class)
+ .and(input(isTableScan("broadcast_tbl1")))
+ ))
+ ))
+ );
+ }
+
+ /**
+ * Tests SET operations on tables with different affinity distribution.
+ *
+ * <p>Different affinity distributions cannot be colocated.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @EnumSource
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18211")
+ public void testSetOpNonColocatedAffinity(SetOp setOp) throws Exception {
+ String sql = "SELECT * FROM affinity_tbl1 "
+ + setOp(setOp)
+ + "SELECT * FROM affinity_tbl3 ";
+
assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
.and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("affinity_tbl1")))
- .and(input(1, isTableScan("affinity_tbl2")))
+ .and(input(1, isTableScan("affinity_tbl3")))
+ ))
+ );
+
+ sql = "SELECT * FROM affinity_tbl1 "
+ + setOp(setOp)
+ + "SELECT * FROM affinity_tbl4 ";
+
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
+ .and(input(0, isTableScan("affinity_tbl1")))
+ .and(input(1, isTableScan("affinity_tbl4")))
))
);
}
/**
- * TestSetOpBroadcastAndRandom.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests two SET operations (nested and outer) on two tables with the same affinity distribution.
+ *
+ * <p>Nested operation is considered colocated because the tables are compared against the corresponding collocation columns.
+ * Outer operation considered colocated because the result of nested operation must have the distribution of one of the participating
+ * tables.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @EnumSource
+ public void testSetOpAffinityNested(SetOp setOp) throws Exception {
+ String sql = "SELECT * FROM affinity_tbl2 " + setOp(setOp) + "("
+ + " SELECT * FROM affinity_tbl1 "
+ + setOp(setOp)
+ + " SELECT * FROM affinity_tbl2"
+ + ")";
+
+ assertPlan(sql, publicSchema, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(setOp.colocated)
+ .and(input(0, isTableScan("affinity_tbl2")))
+ .and(input(1, isInstanceOf(setOp.colocated)
+ .and(input(0, isTableScan("affinity_tbl1")))
+ .and(input(1, isTableScan("affinity_tbl2")))
+ ))
+ )),
+ "MinusMergeRule", "IntersectMergeRule"
+ );
+ }
+
+ /**
+ * Tests SET operations on two tables with broadcast and random distribution.
+ *
+ * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+ * distribution satisfies any other distribution.
*
* @throws Exception If failed.
*/
@@ -244,15 +373,19 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ setOp(setOp)
+ "SELECT * FROM broadcast_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
.and(input(0, hasChildThat(isTableScan("random_tbl1"))))
.and(input(1, isTableScan("broadcast_tbl1")))
);
}
/**
- * TestSetOpRandomNested.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests two SET operations (nested and outer) on two tables with random distribution.
+ *
+ * <p>Nested operation cannot be colocated because {@link Type#RANDOM_DISTRIBUTED random}
+ * distribution cannot be colocated with other random distribution.
+ * Outer operation considered colocated because the result of nested operation must have
+ * the distribution of one of the participating tables.
*
* @throws Exception If failed.
*/
@@ -266,31 +399,25 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ " SELECT * FROM random_tbl2"
+ ")";
- if (setOp == SetOp.EXCEPT) {
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
- .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
- .and(input(1, isInstanceOf(setOp.reduce)
- .and(hasChildThat(isInstanceOf(setOp.map)
- .and(input(0, isTableScan("random_tbl1")))
- .and(input(1, isTableScan("random_tbl2")))
- ))
- ))
- );
- } else {
- // INTERSECT operator is commutative and can be merged.
- assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
- .and(hasChildThat(isInstanceOf(setOp.map)
- .and(input(0, isTableScan("random_tbl2")))
- .and(input(1, isTableScan("random_tbl1")))
- .and(input(2, isTableScan("random_tbl2")))
- ))
- );
- }
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+ .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
+ .and(input(1, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
+ )),
+ "IntersectMergeRule"
+ );
}
/**
- * TestSetOpBroadcastAndRandomNested.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests two SET operations (nested and outer) on three tables with two random (nested) and one broadcast (outer) distribution.
+ *
+ * <p>Nested operation cannot be colocated because {@link Type#RANDOM_DISTRIBUTED random}
+ * distribution cannot be colocated with other random distribution.
+ * Outer operation considered colocated because because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+ * distribution satisfies any other distribution.
*
* @throws Exception If failed.
*/
@@ -305,31 +432,20 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
+ " SELECT * FROM random_tbl2"
+ ")";
- if (setOp == SetOp.EXCEPT) {
- assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
- .and(input(0, isTableScan("broadcast_tbl1")))
- .and(input(1, isInstanceOf(setOp.reduce)
- .and(hasChildThat(isInstanceOf(setOp.map)
- .and(input(0, isTableScan("random_tbl1")))
- .and(input(1, isTableScan("random_tbl2")))
- ))
- ))
- );
- } else {
- // INTERSECT operator is commutative and can be merged.
- assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
- .and(hasChildThat(isInstanceOf(setOp.map)
- .and(input(0, nodeOrAnyChild(isTableScan("broadcast_tbl1"))))
- .and(input(1, isTableScan("random_tbl1")))
- .and(input(2, isTableScan("random_tbl2")))
- ))
- );
- }
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+ .and(input(0, isTableScan("broadcast_tbl1")))
+ .and(input(1, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
+ )),
+ "IntersectMergeRule"
+ );
}
/**
- * TestSetOpMerge.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests multiple SET operations on multiple tables with affinity and random distribution.
*
* @throws Exception If failed.
*/
@@ -355,8 +471,7 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
}
/**
- * TestSetOpAllMerge.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests multiple SET operations (with ALL flag enabled) on multiple tables with affinity and random distribution.
*
* @throws Exception If failed.
*/
@@ -382,8 +497,7 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
}
/**
- * TestSetOpAllWithExceptMerge.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Tests two SET operations (with ALL flag enabled for the first one) on tables with affinity and random distribution.
*
* @throws Exception If failed.
*/
@@ -415,28 +529,28 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
enum SetOp {
EXCEPT(
- IgniteSingleMinus.class,
+ IgniteColocatedMinus.class,
IgniteMapMinus.class,
IgniteReduceMinus.class
),
INTERSECT(
- IgniteSingleIntersect.class,
+ IgniteColocatedIntersect.class,
IgniteMapIntersect.class,
IgniteReduceIntersect.class
);
- public final Class<? extends IgniteSingleSetOp> single;
+ public final Class<? extends IgniteColocatedSetOp> colocated;
public final Class<? extends IgniteMapSetOp> map;
public final Class<? extends IgniteReduceSetOp> reduce;
SetOp(
- Class<? extends IgniteSingleSetOp> single,
+ Class<? extends IgniteColocatedSetOp> colocated,
Class<? extends IgniteMapSetOp> map,
Class<? extends IgniteReduceSetOp> reduce) {
- this.single = single;
+ this.colocated = colocated;
this.map = map;
this.reduce = reduce;
}