You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by hy...@apache.org on 2019/03/28 20:06:44 UTC

[calcite] branch master updated: [CALCITE-2658] Add ExchangeRemoveConstantKeysRule that removes constant keys from Exchange or SortExchange (Chunwei Lei)

This is an automated email from the ASF dual-hosted git repository.

hyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0537f27  [CALCITE-2658] Add ExchangeRemoveConstantKeysRule that removes constant keys from Exchange or SortExchange (Chunwei Lei)
0537f27 is described below

commit 0537f27d87598dea888050a4cc63cf4d6fe027d7
Author: chunwei.lcw <ch...@163.com>
AuthorDate: Thu Nov 15 20:40:59 2018 +0800

    [CALCITE-2658] Add ExchangeRemoveConstantKeysRule that removes constant keys from Exchange or SortExchange (Chunwei Lei)
---
 .../apache/calcite/prepare/CalcitePrepareImpl.java |   5 +-
 .../rel/rules/ExchangeRemoveConstantKeysRule.java  | 209 +++++++++++++++++++++
 .../org/apache/calcite/test/RelOptRulesTest.java   |  36 ++++
 .../org/apache/calcite/test/RelOptRulesTest.xml    |  20 ++
 4 files changed, 269 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
index d5fee2d..2c08b76 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
@@ -77,6 +77,7 @@ import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
 import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
 import org.apache.calcite.rel.rules.AggregateStarTableRule;
 import org.apache.calcite.rel.rules.AggregateValuesRule;
+import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule;
 import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
 import org.apache.calcite.rel.rules.FilterJoinRule;
 import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
@@ -234,7 +235,9 @@ public class CalcitePrepareImpl implements CalcitePrepare {
           SortProjectTransposeRule.INSTANCE,
           SortJoinTransposeRule.INSTANCE,
           SortRemoveConstantKeysRule.INSTANCE,
-          SortUnionTransposeRule.INSTANCE);
+          SortUnionTransposeRule.INSTANCE,
+          ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE,
+          ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE);
 
   private static final List<RelOptRule> CONSTANT_REDUCTION_RULES =
       ImmutableList.of(
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ExchangeRemoveConstantKeysRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ExchangeRemoveConstantKeysRule.java
new file mode 100644
index 0000000..2794792
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ExchangeRemoveConstantKeysRule.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexInputRef;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Planner rule that removes keys from
+ * a {@link Exchange} if those keys are known to be constant.
+ *
+ * <p>For example,
+ * <code>SELECT key,value FROM (SELECT 1 AS key, value FROM src) r DISTRIBUTE
+ * BY key</code> can be reduced to
+ * <code>SELECT 1 AS key, value FROM src</code>.</p>
+ *
+ */
+public class ExchangeRemoveConstantKeysRule extends RelOptRule {
+  /**
+   * Singleton rule that removes constants inside a
+   * {@link LogicalExchange}.
+   */
+  public static final ExchangeRemoveConstantKeysRule EXCHANGE_INSTANCE =
+      new ExchangeRemoveConstantKeysRule(LogicalExchange.class,
+          "ExchangeRemoveConstantKeysRule");
+
+  /**
+   * Singleton rule that removes constants inside a
+   * {@link LogicalSortExchange}.
+   */
+  public static final ExchangeRemoveConstantKeysRule SORT_EXCHANGE_INSTANCE =
+      new SortExchangeRemoveConstantKeysRule(LogicalSortExchange.class,
+          "SortExchangeRemoveConstantKeysRule");
+
+  private ExchangeRemoveConstantKeysRule(Class<? extends RelNode> clazz,
+      String description) {
+    super(operand(clazz, any()), RelFactories.LOGICAL_BUILDER, description);
+  }
+
+  /** Removes constant in distribution keys. */
+  protected static List<Integer> simplifyDistributionKeys(RelDistribution distribution,
+      Set<Integer> constants) {
+    return distribution.getKeys().stream()
+        .filter(key -> !constants.contains(key))
+        .collect(Collectors.toList());
+  }
+
+  @Override public boolean matches(RelOptRuleCall call) {
+    final Exchange exchange = call.rel(0);
+    return exchange.getDistribution().getType()
+        == RelDistribution.Type.HASH_DISTRIBUTED;
+  }
+
+  @Override public void onMatch(RelOptRuleCall call) {
+    final Exchange exchange = call.rel(0);
+    final RelMetadataQuery mq = call.getMetadataQuery();
+    final RelNode input = exchange.getInput();
+    final RelOptPredicateList predicates = mq.getPulledUpPredicates(input);
+    if (predicates == null) {
+      return;
+    }
+
+    final Set<Integer> constants = new HashSet<>();
+    predicates.constantMap.keySet().forEach(key -> {
+      if (key instanceof RexInputRef) {
+        constants.add(((RexInputRef) key).getIndex());
+      }
+    });
+    if (constants.isEmpty()) {
+      return;
+    }
+
+    final List<Integer> distributionKeys = simplifyDistributionKeys(
+        exchange.getDistribution(), constants);
+
+    if (distributionKeys.size() != exchange.getDistribution().getKeys()
+        .size()) {
+      call.transformTo(call.builder()
+          .push(exchange.getInput())
+          .exchange(distributionKeys.isEmpty()
+              ?
+              RelDistributions.SINGLETON
+              :
+              RelDistributions.hash(distributionKeys))
+          .build());
+      call.getPlanner().setImportance(exchange, 0.0);
+    }
+  }
+
+  /**
+   * Rule that reduces constants inside a {@link SortExchange}.
+   */
+  public static class SortExchangeRemoveConstantKeysRule
+      extends ExchangeRemoveConstantKeysRule {
+
+    private SortExchangeRemoveConstantKeysRule(Class<? extends RelNode> clazz,
+            String description) {
+      super(clazz, description);
+    }
+
+    @Override public boolean matches(RelOptRuleCall call) {
+      final SortExchange sortExchange = call.rel(0);
+      return  sortExchange.getDistribution().getType()
+          == RelDistribution.Type.HASH_DISTRIBUTED
+          || !sortExchange.getCollation().getFieldCollations().isEmpty();
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final SortExchange sortExchange = call.rel(0);
+      final RelMetadataQuery mq = call.getMetadataQuery();
+      final RelNode input = sortExchange.getInput();
+      final RelOptPredicateList predicates = mq.getPulledUpPredicates(input);
+      if (predicates == null) {
+        return;
+      }
+
+      final Set<Integer> constants = new HashSet<>();
+      predicates.constantMap.keySet().forEach(key -> {
+        if (key instanceof RexInputRef) {
+          constants.add(((RexInputRef) key).getIndex());
+        }
+      });
+
+      if (constants.isEmpty()) {
+        return;
+      }
+
+      List<Integer> distributionKeys = new ArrayList<>();
+      boolean distributionSimplified = false;
+      boolean hashDistribution = sortExchange.getDistribution().getType()
+          == RelDistribution.Type.HASH_DISTRIBUTED;
+      if (hashDistribution) {
+        distributionKeys = simplifyDistributionKeys(
+            sortExchange.getDistribution(), constants);
+        distributionSimplified =
+            distributionKeys.size() != sortExchange.getDistribution().getKeys()
+                .size();
+      }
+
+      final List<RelFieldCollation> fieldCollations = sortExchange
+          .getCollation().getFieldCollations().stream().filter(
+              fc -> !constants.contains(fc.getFieldIndex()))
+           .collect(Collectors.toList());
+
+      boolean collationSimplified =
+           fieldCollations.size() != sortExchange.getCollation()
+               .getFieldCollations().size();
+      if (distributionSimplified
+           || collationSimplified) {
+        RelDistribution distribution = distributionSimplified
+            ?
+            distributionKeys.isEmpty()
+                ?
+                RelDistributions.SINGLETON
+                :
+                RelDistributions.hash(distributionKeys)
+            :
+            sortExchange.getDistribution();
+        RelCollation collation = collationSimplified
+            ?
+            RelCollations.of(fieldCollations)
+            :
+            sortExchange.getCollation();
+
+        call.transformTo(call.builder()
+            .push(sortExchange.getInput())
+            .sortExchange(distribution, collation)
+            .build());
+        call.getPlanner().setImportance(sortExchange, 0.0);
+      }
+    }
+  }
+}
+
+// End ExchangeRemoveConstantKeysRule.java
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index c612cad..b4ce16e 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -30,6 +30,9 @@ import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.prepare.Prepare;
 import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.core.Aggregate;
@@ -62,6 +65,7 @@ import org.apache.calcite.rel.rules.AggregateValuesRule;
 import org.apache.calcite.rel.rules.CalcMergeRule;
 import org.apache.calcite.rel.rules.CoerceInputsRule;
 import org.apache.calcite.rel.rules.DateRangeRules;
+import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule;
 import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
 import org.apache.calcite.rel.rules.FilterJoinRule;
 import org.apache.calcite.rel.rules.FilterMergeRule;
@@ -4311,6 +4315,38 @@ public class RelOptRulesTest extends RelOptTestBase {
     checkPlanning(program, sql);
   }
 
+  @Test public void testExchangeRemoveConstantKeysRule() {
+    final DiffRepository diffRepos = getDiffRepos();
+    final RelBuilder builder = RelBuilder.create(RelBuilderTest.config().build());
+    RelNode root = builder
+        .scan("EMP")
+        .filter(
+            builder.call(SqlStdOperatorTable.EQUALS,
+                builder.field("EMPNO"), builder.literal(10)))
+        .exchange(RelDistributions.hash(ImmutableList.of(0)))
+        .project(builder.field(0), builder.field(1))
+        .sortExchange(RelDistributions.hash(ImmutableList.of(0, 1)),
+            RelCollations.of(new RelFieldCollation(0), new RelFieldCollation(1)))
+        .build();
+
+    HepProgram preProgram = new HepProgramBuilder().build();
+    HepPlanner prePlanner = new HepPlanner(preProgram);
+    prePlanner.setRoot(root);
+    final RelNode relBefore = prePlanner.findBestExp();
+    final String planBefore = NL + RelOptUtil.toString(relBefore);
+    diffRepos.assertEquals("planBefore", "${planBefore}", planBefore);
+
+    HepProgram hepProgram = new HepProgramBuilder()
+        .addRuleInstance(ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE)
+        .addRuleInstance(ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE)
+        .build();
+
+    HepPlanner hepPlanner = new HepPlanner(hepProgram);
+    hepPlanner.setRoot(root);
+    final RelNode relAfter = hepPlanner.findBestExp();
+    final String planAfter = NL + RelOptUtil.toString(relAfter);
+    diffRepos.assertEquals("planAfter", "${planAfter}", planAfter);
+  }
 }
 
 // End RelOptRulesTest.java
diff --git a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
index cea2776..45af69c 100644
--- a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
@@ -8854,4 +8854,24 @@ LogicalProject(COL1=[SUM(100) OVER (PARTITION BY $7, $5 ORDER BY $5 RANGE BETWEE
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testExchangeRemoveConstantKeysRule">
+        <Resource name="planBefore">
+            <![CDATA[
+LogicalSortExchange(distribution=[hash[0, 1]], collation=[[0, 1]])
+  LogicalProject(EMPNO=[$0], ENAME=[$1])
+    LogicalExchange(distribution=[hash[0]])
+      LogicalFilter(condition=[=($0, 10)])
+        LogicalTableScan(table=[[scott, EMP]])
+]]>
+        </Resource>
+        <Resource name="planAfter">
+            <![CDATA[
+LogicalSortExchange(distribution=[hash[1]], collation=[[1]])
+  LogicalProject(EMPNO=[$0], ENAME=[$1])
+    LogicalExchange(distribution=[single])
+      LogicalFilter(condition=[=($0, 10)])
+        LogicalTableScan(table=[[scott, EMP]])
+]]>
+        </Resource>
+    </TestCase>
 </Root>