You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/12/12 11:22:04 UTC

[ignite-3] branch main updated: IGNITE-18224 Sql. Allow hash output distribution for SET relations (#1416)

This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a5b885e84f IGNITE-18224 Sql. Allow hash output distribution for SET relations (#1416)
a5b885e84f is described below

commit a5b885e84f6ac2c2bcc06a1658b3b94445ef78c9
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Mon Dec 12 14:21:58 2022 +0300

    IGNITE-18224 Sql. Allow hash output distribution for SET relations (#1416)
---
 .../ignite/internal/sql/engine/ItSetOpTest.java    |  34 +++
 .../internal/sql/engine/prepare/PlannerPhase.java  |   4 +-
 ...ntersect.java => IgniteColocatedIntersect.java} |  19 +-
 ...eSingleMinus.java => IgniteColocatedMinus.java} |  19 +-
 ...eSingleSetOp.java => IgniteColocatedSetOp.java} |  50 +++-
 .../internal/sql/engine/rel/set/IgniteSetOp.java   |   2 +-
 .../sql/engine/rule/MinusConverterRule.java        |   4 +-
 .../sql/engine/rule/SetOpConverterRule.java        |  28 +-
 .../sql/engine/planner/SetOpPlannerTest.java       | 290 ++++++++++++++-------
 9 files changed, 317 insertions(+), 133 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
index 250759c7cb..c8aa0c2809 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.List;
 import java.util.function.Predicate;
 import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.sql.engine.util.QueryChecker;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -185,6 +186,39 @@ public class ItSetOpTest extends AbstractBasicIntegrationTest {
         assertEquals(2, countIf(rows, r -> r.get(0).equals("Igor1")));
     }
 
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18211")
+    public void testSetOpColocated() {
+        sql("CREATE TABLE emp(empid INTEGER, deptid INTEGER, name VARCHAR, PRIMARY KEY(empid, deptid)) COLOCATE BY (deptid)");
+        sql("CREATE TABLE dept(deptid INTEGER, name VARCHAR, PRIMARY KEY(deptid))");
+
+        sql("INSERT INTO emp VALUES (0, 0, 'test0'), (1, 0, 'test1'), (2, 1, 'test2')");
+        sql("INSERT INTO dept VALUES (0, 'test0'), (1, 'test1'), (2, 'test2')");
+
+        assertQuery("SELECT deptid, name FROM emp EXCEPT SELECT deptid, name FROM dept")
+                .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedMinus.*"))
+                .returns(0, "test1")
+                .returns(1, "test2")
+                .check();
+
+        assertQuery("SELECT deptid, name FROM dept EXCEPT SELECT deptid, name FROM emp")
+                .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedMinus.*"))
+                .returns(1, "test1")
+                .returns(2, "test2")
+                .check();
+
+        assertQuery("SELECT deptid FROM dept EXCEPT SELECT deptid FROM emp")
+                .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedMinus.*"))
+                .returns(2)
+                .check();
+
+        assertQuery("SELECT deptid FROM dept INTERSECT SELECT deptid FROM emp")
+                .matches(QueryChecker.matches(".*IgniteExchange.*IgniteColocatedIntersect.*"))
+                .returns(0)
+                .returns(1)
+                .check();
+    }
+
     /**
      * Test that set op node can be rewinded.
      */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 9d9abcbe9d..a388a4e388 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -212,9 +212,9 @@ public enum PlannerPhase {
             HashAggregateConverterRule.MAP_REDUCE,
             SortAggregateConverterRule.COLOCATED,
             SortAggregateConverterRule.MAP_REDUCE,
-            SetOpConverterRule.SINGLE_MINUS,
+            SetOpConverterRule.COLOCATED_MINUS,
             SetOpConverterRule.MAP_REDUCE_MINUS,
-            SetOpConverterRule.SINGLE_INTERSECT,
+            SetOpConverterRule.COLOCATED_INTERSECT,
             SetOpConverterRule.MAP_REDUCE_INTERSECT,
             ProjectConverterRule.INSTANCE,
             FilterConverterRule.INSTANCE,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleIntersect.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedIntersect.java
similarity index 75%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleIntersect.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedIntersect.java
index 63918de705..87a741aee9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleIntersect.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedIntersect.java
@@ -27,10 +27,10 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
- * Physical node for INTERSECT operator which inputs satisfy SINGLE distribution.
+ * Physical node for INTERSECT operator which inputs are colocated.
  */
-public class IgniteSingleIntersect extends IgniteIntersect implements IgniteSingleSetOp {
-    public IgniteSingleIntersect(
+public class IgniteColocatedIntersect extends IgniteIntersect implements IgniteColocatedSetOp {
+    public IgniteColocatedIntersect(
             RelOptCluster cluster,
             RelTraitSet traitSet,
             List<RelNode> inputs,
@@ -40,23 +40,24 @@ public class IgniteSingleIntersect extends IgniteIntersect implements IgniteSing
     }
 
     /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Constructor used for deserialization.
+     *
+     * @param input Serialized representation.
      */
-    public IgniteSingleIntersect(RelInput input) {
+    public IgniteColocatedIntersect(RelInput input) {
         super(input);
     }
 
     /** {@inheritDoc} */
     @Override
-    public IgniteSingleIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-        return new IgniteSingleIntersect(getCluster(), traitSet, inputs, all);
+    public IgniteColocatedIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteColocatedIntersect(getCluster(), traitSet, inputs, all);
     }
 
     /** {@inheritDoc} */
     @Override
     public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteSingleIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
+        return new IgniteColocatedIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleMinus.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedMinus.java
similarity index 76%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleMinus.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedMinus.java
index 49ade1db0d..0b23c00ad9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleMinus.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedMinus.java
@@ -27,10 +27,10 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
- * Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
+ * Physical node for MINUS (EXCEPT) operator which inputs are colocated.
  */
-public class IgniteSingleMinus extends IgniteMinus implements IgniteSingleSetOp {
-    public IgniteSingleMinus(
+public class IgniteColocatedMinus extends IgniteMinus implements IgniteColocatedSetOp {
+    public IgniteColocatedMinus(
             RelOptCluster cluster,
             RelTraitSet traitSet,
             List<RelNode> inputs,
@@ -40,23 +40,24 @@ public class IgniteSingleMinus extends IgniteMinus implements IgniteSingleSetOp
     }
 
     /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Constructor used for deserialization.
+     *
+     * @param input Serialized representation.
      */
-    public IgniteSingleMinus(RelInput input) {
+    public IgniteColocatedMinus(RelInput input) {
         super(input);
     }
 
     /** {@inheritDoc} */
     @Override
-    public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-        return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
+    public IgniteColocatedMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteColocatedMinus(getCluster(), traitSet, inputs, all);
     }
 
     /** {@inheritDoc} */
     @Override
     public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteSingleMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
+        return new IgniteColocatedMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleSetOp.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedSetOp.java
similarity index 65%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleSetOp.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedSetOp.java
index 499d4bca12..3e8ca24190 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSingleSetOp.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteColocatedSetOp.java
@@ -17,23 +17,26 @@
 
 package org.apache.ignite.internal.sql.engine.rel.set;
 
+import com.google.common.collect.ImmutableList;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
- * Physical node for set op (MINUS, INTERSECT) operator which inputs satisfy SINGLE distribution.
+ * Physical node for set op (MINUS, INTERSECT) operator which inputs are colocated.
  */
-public interface IgniteSingleSetOp extends IgniteSetOp {
+public interface IgniteColocatedSetOp extends IgniteSetOp {
     /** {@inheritDoc} */
     @Override
     public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
@@ -69,15 +72,46 @@ public interface IgniteSingleSetOp extends IgniteSetOp {
             RelTraitSet nodeTraits,
             List<RelTraitSet> inputTraits
     ) {
-        boolean single = inputTraits.stream()
-                .map(TraitUtils::distribution)
-                .allMatch(d -> d.satisfies(IgniteDistributions.single()));
+        boolean haveSingle = false;
+        IgniteDistribution hashDistribution = null;
 
-        if (!single) {
-            return List.of();
+        for (RelTraitSet traits : inputTraits) {
+            IgniteDistribution distribution = TraitUtils.distribution(traits);
+
+            if (distribution == IgniteDistributions.single()) {
+                if (hashDistribution != null) { // Single incompatible with hash.
+                    return ImmutableList.of();
+                }
+
+                haveSingle = true;
+            } else if (distribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) {
+                if (haveSingle) { // Hash incompatible with single.
+                    return ImmutableList.of();
+                }
+
+                if (hashDistribution == null) {
+                    hashDistribution = distribution;
+                } else if (!hashDistribution.satisfies(distribution)) {
+                    return ImmutableList.of();
+                }
+
+            } else if (distribution != IgniteDistributions.broadcast()) {
+                return ImmutableList.of();
+            }
         }
 
-        return List.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
+        assert hashDistribution == null || !haveSingle;
+
+        if (haveSingle) {
+            return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
+        } else if (hashDistribution != null) {
+            IgniteDistribution distribution = hashDistribution;
+
+            return ImmutableList.of(Pair.of(nodeTraits.replace(distribution),
+                    Commons.transform(inputTraits, t -> t.replace(distribution))));
+        } else {
+            return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.broadcast()), inputTraits));
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java
index 576861a6e3..0a37b1d10f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/set/IgniteSetOp.java
@@ -71,7 +71,7 @@ public interface IgniteSetOp extends TraitsAwareIgniteRel {
 
         double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
 
-        return costFactory.makeCost(inputRows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+        return costFactory.makeCost(inputRows, inputRows * IgniteCost.HASH_LOOKUP_COST, 0, mem, 0);
     }
 
     /** Aggregate type. */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java
index aa0274ee2f..ecc5d1b7e7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MinusConverterRule.java
@@ -28,9 +28,9 @@ import org.apache.calcite.rel.logical.LogicalMinus;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedMinus;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapMinus;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleMinus;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 
 /**
@@ -58,7 +58,7 @@ public class MinusConverterRule {
             RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
             List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
 
-            return new IgniteSingleMinus(cluster, outTrait, inputs, setOp.all);
+            return new IgniteColocatedMinus(cluster, outTrait, inputs, setOp.all);
         }
     }
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java
index 916972d9b9..8a9ffdf5aa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SetOpConverterRule.java
@@ -31,21 +31,21 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedIntersect;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedMinus;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapIntersect;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapMinus;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceIntersect;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleIntersect;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleMinus;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 
 /**
  * Set op (MINUS, INTERSECT) converter rule.
  */
 public class SetOpConverterRule {
-    public static final RelOptRule SINGLE_MINUS = new SingleMinusConverterRule();
+    public static final RelOptRule COLOCATED_MINUS = new ColocatedMinusConverterRule();
 
-    public static final RelOptRule SINGLE_INTERSECT = new SingleIntersectConverterRule();
+    public static final RelOptRule COLOCATED_INTERSECT = new ColocatedIntersectConverterRule();
 
     public static final RelOptRule MAP_REDUCE_MINUS = new MapReduceMinusConverterRule();
 
@@ -55,8 +55,8 @@ public class SetOpConverterRule {
         // No-op.
     }
 
-    private abstract static class SingleSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
-        SingleSetOpConverterRule(Class<T> cls, String desc) {
+    private abstract static class ColocatedSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
+        ColocatedSetOpConverterRule(Class<T> cls, String desc) {
             super(cls, desc);
         }
 
@@ -75,29 +75,29 @@ public class SetOpConverterRule {
         }
     }
 
-    private static class SingleMinusConverterRule extends SingleSetOpConverterRule<LogicalMinus> {
-        SingleMinusConverterRule() {
-            super(LogicalMinus.class, "SingleMinusConverterRule");
+    private static class ColocatedMinusConverterRule extends ColocatedSetOpConverterRule<LogicalMinus> {
+        ColocatedMinusConverterRule() {
+            super(LogicalMinus.class, "ColocatedMinusConverterRule");
         }
 
         /** {@inheritDoc} */
         @Override
         PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
                 boolean all) {
-            return new IgniteSingleMinus(cluster, traits, inputs, all);
+            return new IgniteColocatedMinus(cluster, traits, inputs, all);
         }
     }
 
-    private static class SingleIntersectConverterRule extends SingleSetOpConverterRule<LogicalIntersect> {
-        SingleIntersectConverterRule() {
-            super(LogicalIntersect.class, "SingleIntersectConverterRule");
+    private static class ColocatedIntersectConverterRule extends ColocatedSetOpConverterRule<LogicalIntersect> {
+        ColocatedIntersectConverterRule() {
+            super(LogicalIntersect.class, "ColocatedIntersectConverterRule");
         }
 
         /** {@inheritDoc} */
         @Override
         PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
                 boolean all) {
-            return new IgniteSingleIntersect(cluster, traits, inputs, all);
+            return new IgniteColocatedIntersect(cluster, traits, inputs, all);
         }
     }
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
index be2071053a..1970557e53 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
@@ -17,8 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine.planner;
 
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution.Type;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedIntersect;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedMinus;
+import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedSetOp;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapIntersect;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapMinus;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapSetOp;
@@ -26,14 +33,12 @@ import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceIntersect;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceMinus;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceSetOp;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleIntersect;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleMinus;
-import org.apache.ignite.internal.sql.engine.rel.set.IgniteSingleSetOp;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -48,7 +53,8 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
 
     /**
      * Setup.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     *
+     * <p>Prepares multiple test tables with different distributions.
      */
     @BeforeAll
     public void setup() {
@@ -70,15 +76,28 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
         createTable(publicSchema, "SINGLE_TBL2", type, IgniteDistributions.single());
 
         createTable(publicSchema, "AFFINITY_TBL1", type,
-                IgniteDistributions.affinity(0, "Test1", "hash"));
+                // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+                // IgniteDistributions.affinity(0, "Test1", "hash"));
+                IgniteDistributions.hash(List.of(0)));
+
 
         createTable(publicSchema, "AFFINITY_TBL2", type,
-                IgniteDistributions.affinity(0, "Test2", "hash"));
+                // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+                // IgniteDistributions.affinity(0, "Test2", "hash"));
+                IgniteDistributions.hash(List.of(0)));
+
+        createTable(publicSchema, "AFFINITY_TBL3", type,
+                IgniteDistributions.affinity(1, "Test3", "hash"));
+
+        createTable(publicSchema, "AFFINITY_TBL4", type,
+                IgniteDistributions.affinity(0, "Test4", "hash2"));
     }
 
     /**
-     * TestSetOpRandom.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations on two tables with random distribution.
+     *
+     * <p>{@link Type#RANDOM_DISTRIBUTED Random} distribution cannot be colocated
+     * with other random distribution.
      *
      * @throws Exception If failed.
      */
@@ -98,8 +117,10 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
     }
 
     /**
-     * TestSetOpAllRandom.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations (with ALL flag enabled) on two tables with random distribution.
+     *
+     * <p>{@link Type#RANDOM_DISTRIBUTED Random} distribution cannot be colocated
+     * with other random distribution.
      *
      * @throws Exception If failed.
      */
@@ -114,13 +135,14 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                         .and(hasChildThat(isInstanceOf(setOp.map)
                                 .and(input(0, isTableScan("random_tbl1")))
                                 .and(input(1, isTableScan("random_tbl2")))
-                        )),
-                "SingleIntersectConverterRule");
+                        )));
     }
 
     /**
-     * TestSetOpBroadcast.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations on two tables with broadcast distribution.
+     *
+     * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+     * distribution satisfies any other distribution.
      *
      * @throws Exception If failed.
      */
@@ -131,15 +153,17 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + setOp(setOp)
                 + "SELECT * FROM broadcast_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
                 .and(input(0, isTableScan("broadcast_tbl1")))
                 .and(input(1, isTableScan("broadcast_tbl2")))
         );
     }
 
     /**
-     * TestSetOpSingle.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations on two tables with single distribution.
+     *
+     * <p>The operation is considered colocated because {@link Type#SINGLETON single} distribution
+     * satisfies other single distribution.
      *
      * @throws Exception If failed.
      */
@@ -150,14 +174,16 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + setOp(setOp)
                 + "SELECT * FROM single_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
                 .and(input(0, isTableScan("single_tbl1")))
                 .and(input(1, isTableScan("single_tbl2"))));
     }
 
     /**
-     * TestSetOpSingleAndRandom.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations on two tables with single and random distribution.
+     *
+     * <p>{@link Type#SINGLETON Single} distribution cannot be colocated
+     * with {@link Type#RANDOM_DISTRIBUTED random} distribution.
      *
      * @throws Exception If failed.
      */
@@ -168,14 +194,16 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + setOp(setOp)
                 + "SELECT * FROM random_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+                .and(hasDistribution(IgniteDistributions.single()))
                 .and(input(0, isTableScan("single_tbl1")))
                 .and(input(1, hasChildThat(isTableScan("random_tbl1")))));
     }
 
     /**
-     * TestSetOpSingleAndAffinity.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations on two tables with single and affinity distribution.
+     *
+     * <p>{@link Type#SINGLETON Single} distribution cannot be colocated with affinity distribution.
      *
      * @throws Exception If failed.
      */
@@ -186,14 +214,17 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + setOp(setOp)
                 + "SELECT * FROM affinity_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+                .and(hasDistribution(IgniteDistributions.single()))
                 .and(input(0, isTableScan("single_tbl1")))
                 .and(input(1, hasChildThat(isTableScan("affinity_tbl1")))));
     }
 
     /**
-     * TestSetOpSingleAndBroadcast.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations on two tables with single and broadcast distribution.
+     *
+     * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+     * distribution satisfies any other distribution.
      *
      * @throws Exception If failed.
      */
@@ -204,15 +235,18 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + setOp(setOp)
                 + "SELECT * FROM broadcast_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
                 .and(input(0, isTableScan("single_tbl1")))
                 .and(input(1, isTableScan("broadcast_tbl1")))
         );
     }
 
+
     /**
-     * TestSetOpAffinity.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests SET operations on tables with the same affinity distribution.
+     *
+     * <p>The operation is considered colocated because the tables are
+     * compared against the corresponding collocation columns.
      *
      * @throws Exception If failed.
      */
@@ -223,17 +257,112 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + setOp(setOp)
                 + "SELECT * FROM affinity_tbl2 ";
 
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteExchange.class)
+                .and(input(isInstanceOf(setOp.colocated)
+                        // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+                        // .and(hasDistribution(IgniteDistributions.affinity(0, null, "hash")))
+                        .and(input(0, isTableScan("affinity_tbl1")))
+                        .and(input(1, isTableScan("affinity_tbl2")))
+                ))
+        );
+    }
+
+    /**
+     * Tests SET operations on two tables with affinity and broadcast distribution.
+     *
+     * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+     * distribution satisfies any other distribution.
+     *
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @EnumSource
+    public void testSetOpAffinityAndBroadcast(SetOp setOp) throws Exception {
+        String sql = "SELECT * FROM affinity_tbl1 "
+                + setOp(setOp)
+                + "SELECT * FROM broadcast_tbl1 ";
+
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteExchange.class)
+                .and(input(isInstanceOf(setOp.colocated)
+                        // TODO https://issues.apache.org/jira/browse/IGNITE-18211
+                        // .and(hasDistribution(IgniteDistributions.affinity(0, null, "hash")))
+                        .and(input(0, isTableScan("affinity_tbl1")))
+                        .and(input(1, isInstanceOf(IgniteTrimExchange.class)
+                                .and(input(isTableScan("broadcast_tbl1")))
+                        ))
+                ))
+        );
+    }
+
+    /**
+     * Tests SET operations on tables with different affinity distribution.
+     *
+     * <p>Different affinity distributions cannot be colocated.
+     *
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @EnumSource
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18211")
+    public void testSetOpNonColocatedAffinity(SetOp setOp) throws Exception {
+        String sql = "SELECT * FROM affinity_tbl1 "
+                + setOp(setOp)
+                + "SELECT * FROM affinity_tbl3 ";
+
         assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
                 .and(hasChildThat(isInstanceOf(setOp.map)
                         .and(input(0, isTableScan("affinity_tbl1")))
-                        .and(input(1, isTableScan("affinity_tbl2")))
+                        .and(input(1, isTableScan("affinity_tbl3")))
+                ))
+        );
+
+        sql = "SELECT * FROM affinity_tbl1 "
+                + setOp(setOp)
+                + "SELECT * FROM affinity_tbl4 ";
+
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+                .and(hasChildThat(isInstanceOf(setOp.map)
+                        .and(input(0, isTableScan("affinity_tbl1")))
+                        .and(input(1, isTableScan("affinity_tbl4")))
                 ))
         );
     }
 
     /**
-     * TestSetOpBroadcastAndRandom.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests two SET operations (nested and outer) on two tables with the same affinity distribution.
+     *
+     * <p>Nested operation is considered colocated because the tables are compared against the corresponding collocation columns.
+     * Outer operation considered colocated because the result of nested operation must have the distribution of one of the participating
+     * tables.
+     *
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @EnumSource
+    public void testSetOpAffinityNested(SetOp setOp) throws Exception {
+        String sql = "SELECT * FROM affinity_tbl2 " + setOp(setOp) + "("
+                + "   SELECT * FROM affinity_tbl1 "
+                + setOp(setOp)
+                + "   SELECT * FROM affinity_tbl2"
+                + ")";
+
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteExchange.class)
+                        .and(input(isInstanceOf(setOp.colocated)
+                                .and(input(0, isTableScan("affinity_tbl2")))
+                                .and(input(1, isInstanceOf(setOp.colocated)
+                                        .and(input(0, isTableScan("affinity_tbl1")))
+                                        .and(input(1, isTableScan("affinity_tbl2")))
+                                ))
+                        )),
+                "MinusMergeRule", "IntersectMergeRule"
+        );
+    }
+
+    /**
+     * Tests SET operations on two tables with broadcast and random distribution.
+     *
+     * <p>The operation is considered colocated because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+     * distribution satisfies any other distribution.
      *
      * @throws Exception If failed.
      */
@@ -244,15 +373,19 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + setOp(setOp)
                 + "SELECT * FROM broadcast_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
                 .and(input(0, hasChildThat(isTableScan("random_tbl1"))))
                 .and(input(1, isTableScan("broadcast_tbl1")))
         );
     }
 
     /**
-     * TestSetOpRandomNested.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests two SET operations (nested and outer) on two tables with random distribution.
+     *
+     * <p>Nested operation cannot be colocated because {@link Type#RANDOM_DISTRIBUTED random}
+     * distribution cannot be colocated with other random distribution.
+     * Outer operation considered colocated because the result of nested operation must have
+     * the distribution of one of the participating tables.
      *
      * @throws Exception If failed.
      */
@@ -266,31 +399,25 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + "   SELECT * FROM random_tbl2"
                 + ")";
 
-        if (setOp == SetOp.EXCEPT) {
-            assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
-                    .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
-                    .and(input(1, isInstanceOf(setOp.reduce)
-                            .and(hasChildThat(isInstanceOf(setOp.map)
-                                    .and(input(0, isTableScan("random_tbl1")))
-                                    .and(input(1, isTableScan("random_tbl2")))
-                            ))
-                    ))
-            );
-        } else {
-            // INTERSECT operator is commutative and can be merged.
-            assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
-                    .and(hasChildThat(isInstanceOf(setOp.map)
-                            .and(input(0, isTableScan("random_tbl2")))
-                            .and(input(1, isTableScan("random_tbl1")))
-                            .and(input(2, isTableScan("random_tbl2")))
-                    ))
-            );
-        }
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+                        .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
+                        .and(input(1, isInstanceOf(setOp.reduce)
+                                .and(hasChildThat(isInstanceOf(setOp.map)
+                                        .and(input(0, isTableScan("random_tbl1")))
+                                        .and(input(1, isTableScan("random_tbl2")))
+                                ))
+                        )),
+                "IntersectMergeRule"
+        );
     }
 
     /**
-     * TestSetOpBroadcastAndRandomNested.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests two SET operations (nested and outer) on three tables with two random (nested) and one broadcast (outer) distribution.
+     *
+     * <p>Nested operation cannot be colocated because {@link Type#RANDOM_DISTRIBUTED random}
+     * distribution cannot be colocated with other random distribution.
+     * Outer operation considered colocated because because {@link Type#BROADCAST_DISTRIBUTED broadcast}
+     * distribution satisfies any other distribution.
      *
      * @throws Exception If failed.
      */
@@ -305,31 +432,20 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
                 + "   SELECT * FROM random_tbl2"
                 + ")";
 
-        if (setOp == SetOp.EXCEPT) {
-            assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
-                    .and(input(0, isTableScan("broadcast_tbl1")))
-                    .and(input(1, isInstanceOf(setOp.reduce)
-                            .and(hasChildThat(isInstanceOf(setOp.map)
-                                    .and(input(0, isTableScan("random_tbl1")))
-                                    .and(input(1, isTableScan("random_tbl2")))
-                            ))
-                    ))
-            );
-        } else {
-            // INTERSECT operator is commutative and can be merged.
-            assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
-                    .and(hasChildThat(isInstanceOf(setOp.map)
-                            .and(input(0, nodeOrAnyChild(isTableScan("broadcast_tbl1"))))
-                            .and(input(1, isTableScan("random_tbl1")))
-                            .and(input(2, isTableScan("random_tbl2")))
-                    ))
-            );
-        }
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.colocated)
+                        .and(input(0, isTableScan("broadcast_tbl1")))
+                        .and(input(1, isInstanceOf(setOp.reduce)
+                                .and(hasChildThat(isInstanceOf(setOp.map)
+                                        .and(input(0, isTableScan("random_tbl1")))
+                                        .and(input(1, isTableScan("random_tbl2")))
+                                ))
+                        )),
+                "IntersectMergeRule"
+        );
     }
 
     /**
-     * TestSetOpMerge.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests multiple SET operations on multiple tables with affinity and random distribution.
      *
      * @throws Exception If failed.
      */
@@ -355,8 +471,7 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
     }
 
     /**
-     * TestSetOpAllMerge.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests multiple SET operations (with ALL flag enabled) on multiple tables with affinity and random distribution.
      *
      * @throws Exception If failed.
      */
@@ -382,8 +497,7 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
     }
 
     /**
-     * TestSetOpAllWithExceptMerge.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Tests two SET operations (with ALL flag enabled for the first one) on tables with affinity and random distribution.
      *
      * @throws Exception If failed.
      */
@@ -415,28 +529,28 @@ public class SetOpPlannerTest extends AbstractPlannerTest {
 
     enum SetOp {
         EXCEPT(
-                IgniteSingleMinus.class,
+                IgniteColocatedMinus.class,
                 IgniteMapMinus.class,
                 IgniteReduceMinus.class
         ),
 
         INTERSECT(
-                IgniteSingleIntersect.class,
+                IgniteColocatedIntersect.class,
                 IgniteMapIntersect.class,
                 IgniteReduceIntersect.class
         );
 
-        public final Class<? extends IgniteSingleSetOp> single;
+        public final Class<? extends IgniteColocatedSetOp> colocated;
 
         public final Class<? extends IgniteMapSetOp> map;
 
         public final Class<? extends IgniteReduceSetOp> reduce;
 
         SetOp(
-                Class<? extends IgniteSingleSetOp> single,
+                Class<? extends IgniteColocatedSetOp> colocated,
                 Class<? extends IgniteMapSetOp> map,
                 Class<? extends IgniteReduceSetOp> reduce) {
-            this.single = single;
+            this.colocated = colocated;
             this.map = map;
             this.reduce = reduce;
         }