You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/05/04 06:38:43 UTC

[ignite] branch sql-calcite updated: IGNITE-14588 Fix pass-through-distribution traits for single and reduce aggregates - Fixes #9039.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 32e85c3  IGNITE-14588 Fix pass-through-distribution traits for single and reduce aggregates - Fixes #9039.
32e85c3 is described below

commit 32e85c3644fb7b9889cb1e226dbf789e7178fa82
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue May 4 09:35:31 2021 +0300

    IGNITE-14588 Fix pass-through-distribution traits for single and reduce aggregates - Fixes #9039.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../calcite/rel/agg/IgniteReduceAggregateBase.java | 11 +++++-
 .../calcite/rel/agg/IgniteSingleAggregateBase.java | 11 +++++-
 .../query/calcite/CalciteQueryProcessorTest.java   | 36 +++++++++++++++++++
 .../calcite/planner/AggregatePlannerTest.java      | 41 +++++++++++++++++++++-
 4 files changed, 96 insertions(+), 3 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
index 0c3f171..33dbca6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
@@ -39,6 +38,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
@@ -123,6 +123,15 @@ public abstract class IgniteReduceAggregateBase extends SingleRel implements Tra
     }
 
     /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
index 348f4b6..aa2cb95 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.rel.agg;
 
 import java.util.List;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -32,6 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
@@ -55,6 +55,15 @@ public abstract class IgniteSingleAggregateBase extends IgniteAggregate implemen
     }
 
     /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index cd14468..9b0195b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -896,6 +896,42 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     /** */
     @Test
+    public void aggregateNested() throws Exception {
+        String cacheName = "employer";
+
+        IgniteCache<Integer, Employer> employer = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+            .setName(cacheName)
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(Integer.class, Employer.class)
+            .setBackups(2)
+        );
+
+        awaitPartitionMapExchange(true, true, null);
+
+        List<Integer> keysNode0 = primaryKeys(grid(0).cache(cacheName), 2);
+        List<Integer> keysNode1 = primaryKeys(grid(1).cache(cacheName), 1);
+
+        employer.putAll(ImmutableMap.of(
+            keysNode0.get(0), new Employer("Igor", 10d),
+            keysNode0.get(1), new Employer("Roman", 20d) ,
+            keysNode1.get(0), new Employer("Nikolay", 30d)
+        ));
+
+        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> qry = engine.query(null, "PUBLIC",
+            "SELECT avg(salary) FROM " +
+                "(SELECT avg(salary) as salary FROM employer UNION ALL SELECT salary FROM employer)");
+
+        assertEquals(1, qry.size());
+
+        List<List<?>> rows = qry.get(0).getAll();
+        assertEquals(1, rows.size());
+        assertEquals(20d, F.first(F.first(rows)));
+    }
+
+    /** */
+    @Test
     public void query() throws Exception {
         IgniteCache<Integer, Developer> developer = grid(1).createCache(new CacheConfiguration<Integer, Developer>()
             .setName("developer")
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
index 1d36838..d1682e0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite.planner;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.sql.SqlExplainLevel;
@@ -39,6 +38,8 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleA
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.hamcrest.core.IsInstanceOf;
 import org.junit.Assert;
@@ -170,6 +171,44 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
             assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
     }
 
+    /**
+     * Test that aggregate has single distribution output even if parent node accept random distibution inputs.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void distribution() throws Exception {
+        TestTable tbl = createAffinityTable().addIndex(RelCollations.of(ImmutableIntList.of(3)), "grp0");
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", tbl);
+
+        String sql = "SELECT AVG(val0), grp0 FROM TEST GROUP BY grp0 UNION ALL SELECT val0, grp0 FROM test";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            F.concat(algo.rulesToDisable, "SortMapReduceAggregateConverterRule",
+                "HashMapReduceAggregateConverterRule")
+        );
+
+        IgniteSingleAggregateBase singleAgg = findFirstNode(phys, byClass(algo.single));
+
+        assertEquals(IgniteDistributions.single(), TraitUtils.distribution(singleAgg));
+
+        phys = physicalPlan(
+            sql,
+            publicSchema,
+            F.concat(algo.rulesToDisable, "SortSingleAggregateConverterRule",
+                "HashSingleAggregateConverterRule")
+        );
+
+        IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+        assertEquals(IgniteDistributions.single(), TraitUtils.distribution(rdcAgg));
+    }
+
     /** */
     enum AggregateAlgorithm {
         /** */