You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/05/04 06:38:43 UTC
[ignite] branch sql-calcite updated: IGNITE-14588 Fix
pass-through-distribution traits for single and reduce aggregates - Fixes
#9039.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 32e85c3 IGNITE-14588 Fix pass-through-distribution traits for single and reduce aggregates - Fixes #9039.
32e85c3 is described below
commit 32e85c3644fb7b9889cb1e226dbf789e7178fa82
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue May 4 09:35:31 2021 +0300
IGNITE-14588 Fix pass-through-distribution traits for single and reduce aggregates - Fixes #9039.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../calcite/rel/agg/IgniteReduceAggregateBase.java | 11 +++++-
.../calcite/rel/agg/IgniteSingleAggregateBase.java | 11 +++++-
.../query/calcite/CalciteQueryProcessorTest.java | 36 +++++++++++++++++++
.../calcite/planner/AggregatePlannerTest.java | 41 +++++++++++++++++++++-
4 files changed, 96 insertions(+), 3 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
index 0c3f171..33dbca6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
@@ -39,6 +38,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
@@ -123,6 +123,15 @@ public abstract class IgniteReduceAggregateBase extends SingleRel implements Tra
}
/** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
index 348f4b6..aa2cb95 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
@@ -32,6 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
@@ -55,6 +55,15 @@ public abstract class IgniteSingleAggregateBase extends IgniteAggregate implemen
}
/** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index cd14468..9b0195b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -896,6 +896,42 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
@Test
+ public void aggregateNested() throws Exception {
+ String cacheName = "employer";
+
+ IgniteCache<Integer, Employer> employer = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ .setName(cacheName)
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, Employer.class)
+ .setBackups(2)
+ );
+
+ awaitPartitionMapExchange(true, true, null);
+
+ List<Integer> keysNode0 = primaryKeys(grid(0).cache(cacheName), 2);
+ List<Integer> keysNode1 = primaryKeys(grid(1).cache(cacheName), 1);
+
+ employer.putAll(ImmutableMap.of(
+ keysNode0.get(0), new Employer("Igor", 10d),
+ keysNode0.get(1), new Employer("Roman", 20d) ,
+ keysNode1.get(0), new Employer("Nikolay", 30d)
+ ));
+
+ QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> qry = engine.query(null, "PUBLIC",
+ "SELECT avg(salary) FROM " +
+ "(SELECT avg(salary) as salary FROM employer UNION ALL SELECT salary FROM employer)");
+
+ assertEquals(1, qry.size());
+
+ List<List<?>> rows = qry.get(0).getAll();
+ assertEquals(1, rows.size());
+ assertEquals(20d, F.first(F.first(rows)));
+ }
+
+ /** */
+ @Test
public void query() throws Exception {
IgniteCache<Integer, Developer> developer = grid(1).createCache(new CacheConfiguration<Integer, Developer>()
.setName("developer")
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
index 1d36838..d1682e0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite.planner;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.sql.SqlExplainLevel;
@@ -39,6 +38,8 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleA
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
@@ -170,6 +171,44 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
}
+ /**
+ * Test that aggregate has single distribution output even if parent node accept random distibution inputs.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void distribution() throws Exception {
+ TestTable tbl = createAffinityTable().addIndex(RelCollations.of(ImmutableIntList.of(3)), "grp0");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT AVG(val0), grp0 FROM TEST GROUP BY grp0 UNION ALL SELECT val0, grp0 FROM test";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ F.concat(algo.rulesToDisable, "SortMapReduceAggregateConverterRule",
+ "HashMapReduceAggregateConverterRule")
+ );
+
+ IgniteSingleAggregateBase singleAgg = findFirstNode(phys, byClass(algo.single));
+
+ assertEquals(IgniteDistributions.single(), TraitUtils.distribution(singleAgg));
+
+ phys = physicalPlan(
+ sql,
+ publicSchema,
+ F.concat(algo.rulesToDisable, "SortSingleAggregateConverterRule",
+ "HashSingleAggregateConverterRule")
+ );
+
+ IgniteReduceAggregateBase rdcAgg = findFirstNode(phys, byClass(algo.reduce));
+
+ assertEquals(IgniteDistributions.single(), TraitUtils.distribution(rdcAgg));
+ }
+
/** */
enum AggregateAlgorithm {
/** */