You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2018/02/21 19:14:29 UTC

calcite git commit: [CALCITE-2179] General improvements for materialized view rewriting rule

Repository: calcite
Updated Branches:
  refs/heads/master a78400f43 -> aa25dcbe5


[CALCITE-2179] General improvements for materialized view rewriting rule

Close apache/calcite#630


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/aa25dcbe
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/aa25dcbe
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/aa25dcbe

Branch: refs/heads/master
Commit: aa25dcbe565196fb6b78149042ee817427ed4f68
Parents: a78400f
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Fri Feb 16 17:43:02 2018 -0800
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Feb 21 11:14:14 2018 -0800

----------------------------------------------------------------------
 .../rel/rules/AbstractMaterializedViewRule.java | 815 ++++++++++++++-----
 .../AggregateProjectPullUpConstantsRule.java    |   9 +-
 .../java/org/apache/calcite/rex/RexCopier.java  |   3 +-
 .../java/org/apache/calcite/test/JdbcTest.java  |  20 +
 .../calcite/test/MaterializationTest.java       | 111 ++-
 5 files changed, 756 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/aa25dcbe/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
index 1286224..a86f382 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.rel.rules;
 
+import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.plan.RelOptMaterialization;
 import org.apache.calcite.plan.RelOptMaterializations;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -25,6 +26,9 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.SubstitutionVisitor;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelReferentialConstraint;
@@ -44,17 +48,22 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexExecutor;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPermuteInputsShuttle;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexTableInputRef;
 import org.apache.calcite.rex.RexTableInputRef.RelTableRef;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilder.AggCall;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.graph.DefaultDirectedGraph;
 import org.apache.calcite.util.graph.DefaultEdge;
@@ -67,6 +76,7 @@ import org.apache.calcite.util.trace.CalciteLogger;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
@@ -101,37 +111,54 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
       new CalciteLogger(LoggerFactory.getLogger(AbstractMaterializedViewRule.class));
 
   public static final MaterializedViewProjectFilterRule INSTANCE_PROJECT_FILTER =
-      new MaterializedViewProjectFilterRule(RelFactories.LOGICAL_BUILDER, true);
+      new MaterializedViewProjectFilterRule(RelFactories.LOGICAL_BUILDER,
+          true, null, true);
 
   public static final MaterializedViewOnlyFilterRule INSTANCE_FILTER =
-      new MaterializedViewOnlyFilterRule(RelFactories.LOGICAL_BUILDER, true);
+      new MaterializedViewOnlyFilterRule(RelFactories.LOGICAL_BUILDER,
+          true, null, true);
 
   public static final MaterializedViewProjectJoinRule INSTANCE_PROJECT_JOIN =
-      new MaterializedViewProjectJoinRule(RelFactories.LOGICAL_BUILDER, true);
+      new MaterializedViewProjectJoinRule(RelFactories.LOGICAL_BUILDER,
+          true, null, true);
 
   public static final MaterializedViewOnlyJoinRule INSTANCE_JOIN =
-      new MaterializedViewOnlyJoinRule(RelFactories.LOGICAL_BUILDER, true);
+      new MaterializedViewOnlyJoinRule(RelFactories.LOGICAL_BUILDER,
+          true, null, true);
 
   public static final MaterializedViewProjectAggregateRule INSTANCE_PROJECT_AGGREGATE =
-      new MaterializedViewProjectAggregateRule(RelFactories.LOGICAL_BUILDER, true);
+      new MaterializedViewProjectAggregateRule(RelFactories.LOGICAL_BUILDER,
+          true, null);
 
   public static final MaterializedViewOnlyAggregateRule INSTANCE_AGGREGATE =
-      new MaterializedViewOnlyAggregateRule(RelFactories.LOGICAL_BUILDER, true);
+      new MaterializedViewOnlyAggregateRule(RelFactories.LOGICAL_BUILDER,
+          true, null);
 
   //~ Instance fields --------------------------------------------------------
 
   /** Whether to generate rewritings containing union if the query results
    * are contained within the view results. */
-  private final boolean generateUnionRewriting;
+  protected final boolean generateUnionRewriting;
+
+  /** If we generate union rewriting, we might want to pull up projections
+   * from the query itself to maximize rewriting opportunities. */
+  protected final HepProgram unionRewritingPullProgram;
+
+  /** Whether we should create the rewriting in the minimal subtree of plan
+   * operators. */
+  protected final boolean fastBailOut;
 
   //~ Constructors -----------------------------------------------------------
 
   /** Creates a AbstractMaterializedViewRule. */
   protected AbstractMaterializedViewRule(RelOptRuleOperand operand,
       RelBuilderFactory relBuilderFactory, String description,
-      boolean generateUnionRewriting) {
+      boolean generateUnionRewriting, HepProgram unionRewritingPullProgram,
+      boolean fastBailOut) {
     super(operand, relBuilderFactory, description);
     this.generateUnionRewriting = generateUnionRewriting;
+    this.unionRewritingPullProgram = unionRewritingPullProgram;
+    this.fastBailOut = fastBailOut;
   }
 
   /**
@@ -420,7 +447,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
               // We trigger the unifying method. This method will either create a Project
               // or an Aggregate operator on top of the view. It will also compute the
               // output expressions for the query.
-              final RelNode unionInputView = rewriteView(call.builder(), rexBuilder, mq,
+              final RelNode unionInputView = rewriteView(call.builder(), rexBuilder, simplify, mq,
                   matchModality, true, view, topProject, node, topViewProject, viewNode,
                   queryToViewTableMapping, currQEC);
               if (unionInputView == null) {
@@ -488,12 +515,22 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
               // Project or an Aggregate operator on top of the view. It will also compute
               // the output expressions for the query.
               RelBuilder builder = call.builder();
-              builder.push(view);
+              RelNode viewWithFilter;
               if (!viewCompensationPred.isAlwaysTrue()) {
-                builder.filter(simplify.simplify(viewCompensationPred));
+                RexNode newPred = simplify.simplify(viewCompensationPred);
+                viewWithFilter = builder.push(view).filter(newPred).build();
+                // We add (and push) the filter to the view plan before triggering the rewriting.
+                // This is useful in case some of the columns can be folded to same value after
+                // filter is added.
+                Pair<RelNode, RelNode> pushedNodes =
+                    pushFilterToOriginalViewPlan(builder, topViewProject, viewNode, newPred);
+                topViewProject = (Project) pushedNodes.left;
+                viewNode = pushedNodes.right;
+              } else {
+                viewWithFilter = builder.push(view).build();
               }
-              final RelNode result = rewriteView(builder, rexBuilder, mq, matchModality, false,
-                  builder.build(), topProject, node, topViewProject, viewNode,
+              final RelNode result = rewriteView(builder, rexBuilder, simplify, mq, matchModality,
+                  false, viewWithFilter, topProject, node, topViewProject, viewNode,
                   queryToViewTableMapping, currQEC);
               if (result == null) {
                 // Skip it
@@ -552,12 +589,25 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
    * be produced, we will return null.
    */
   protected abstract RelNode rewriteView(RelBuilder relBuilder, RexBuilder rexBuilder,
-      RelMetadataQuery mq, MatchModality matchModality, boolean unionRewriting, RelNode input,
+      RexSimplify simplify, RelMetadataQuery mq, MatchModality matchModality,
+      boolean unionRewriting, RelNode input,
       Project topProject, RelNode node,
       Project topViewProject, RelNode viewNode,
       BiMap<RelTableRef, RelTableRef> queryToViewTableMapping,
       EquivalenceClasses queryEC);
 
+  /**
+   * Once we create a compensation predicate, this method is responsible for pushing
+   * the resulting filter through the view nodes. This might be useful for rewritings
+   * containing Aggregate operators, as some of the grouping columns might be removed,
+   * which results in additional matching possibilities.
+   *
+   * <p>The method will return a pair of nodes: the new top project on the left and
+   * the new node on the right.
+   */
+  protected abstract Pair<RelNode, RelNode> pushFilterToOriginalViewPlan(RelBuilder builder,
+      RelNode topViewProject, RelNode viewNode, RexNode cond);
+
   //~ Instances Join ---------------------------------------------------------
 
   /** Materialized view rewriting for join */
@@ -566,8 +616,10 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
     /** Creates a MaterializedViewJoinRule. */
     protected MaterializedViewJoinRule(RelOptRuleOperand operand,
         RelBuilderFactory relBuilderFactory, String description,
-        boolean generateUnionRewriting) {
-      super(operand, relBuilderFactory, description, generateUnionRewriting);
+        boolean generateUnionRewriting, HepProgram unionRewritingPullProgram,
+        boolean fastBailOut) {
+      super(operand, relBuilderFactory, description, generateUnionRewriting,
+          unionRewritingPullProgram, fastBailOut);
     }
 
     @Override protected boolean isValidPlan(Project topProject, RelNode node,
@@ -596,9 +648,11 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
       // ((A JOIN B) JOIN D) JOIN C
       // But not at:
       // (((A JOIN B) JOIN D) JOIN C) JOIN E
-      for (RelNode joinInput : node.getInputs()) {
-        if (mq.getTableReferences(joinInput).containsAll(viewTableRefs)) {
-          return null;
+      if (fastBailOut) {
+        for (RelNode joinInput : node.getInputs()) {
+          if (mq.getTableReferences(joinInput).containsAll(viewTableRefs)) {
+            return null;
+          }
         }
       }
 
@@ -661,13 +715,30 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
         RelNode node,
         BiMap<RelTableRef, RelTableRef> viewToQueryTableMapping,
         EquivalenceClasses viewEC, EquivalenceClasses queryEC) {
+      // Our target node is the node below the root, which should have the maximum
+      // number of available expressions in the tree in order to maximize our
+      // number of rewritings.
+      // We create a project on top. If the program is available, we execute
+      // it to maximize rewriting opportunities. For instance, a program might
+      // pull up all the expressions that are below the aggregate so we can
+      // introduce compensation filters easily. This is important depending on
+      // the planner strategy.
+      RelNode newNode = node;
+      RelNode target = node;
+      if (unionRewritingPullProgram != null) {
+        final HepPlanner tmpPlanner = new HepPlanner(unionRewritingPullProgram);
+        tmpPlanner.setRoot(newNode);
+        newNode = tmpPlanner.findBestExp();
+        target = newNode.getInput(0);
+      }
+
       // All columns required by compensating predicates must be contained
       // in the query.
-      List<RexNode> queryExprs = extractReferences(rexBuilder, node);
+      List<RexNode> queryExprs = extractReferences(rexBuilder, target);
 
       if (!compensationColumnsEquiPred.isAlwaysTrue()) {
         compensationColumnsEquiPred = rewriteExpression(rexBuilder, mq,
-            node, queryExprs, viewToQueryTableMapping.inverse(), queryEC, false,
+            target, queryExprs, viewToQueryTableMapping.inverse(), queryEC, false,
             compensationColumnsEquiPred);
         if (compensationColumnsEquiPred == null) {
           // Skip it
@@ -677,7 +748,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
       // For the rest, we use the query equivalence classes
       if (!otherCompensationPred.isAlwaysTrue()) {
         otherCompensationPred = rewriteExpression(rexBuilder, mq,
-            node, queryExprs, viewToQueryTableMapping.inverse(), viewEC, true,
+            target, queryExprs, viewToQueryTableMapping.inverse(), viewEC, true,
             otherCompensationPred);
         if (otherCompensationPred == null) {
           // Skip it
@@ -693,12 +764,18 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
               false));
 
       // Generate query rewriting.
-      relBuilder.push(node);
-      relBuilder.filter(simplify.simplify(queryCompensationPred));
+      RelNode rewrittenPlan = relBuilder
+          .push(target)
+          .filter(simplify.simplify(queryCompensationPred))
+          .build();
+      if (unionRewritingPullProgram != null) {
+        rewrittenPlan = newNode.copy(
+            newNode.getTraitSet(), ImmutableList.of(rewrittenPlan));
+      }
       if (topProject != null) {
-        return topProject.copy(topProject.getTraitSet(), ImmutableList.of(relBuilder.build()));
+        return topProject.copy(topProject.getTraitSet(), ImmutableList.of(rewrittenPlan));
       }
-      return relBuilder.build();
+      return rewrittenPlan;
     }
 
     @Override protected RelNode createUnion(RelBuilder relBuilder, RexBuilder rexBuilder,
@@ -725,6 +802,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
     @Override protected RelNode rewriteView(
         RelBuilder relBuilder,
         RexBuilder rexBuilder,
+        RexSimplify simplify,
         RelMetadataQuery mq,
         MatchModality matchModality,
         boolean unionRewriting,
@@ -765,18 +843,25 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
           .project(rewrittenExprs)
           .build();
     }
+
+    @Override public Pair<RelNode, RelNode> pushFilterToOriginalViewPlan(RelBuilder builder,
+        RelNode topViewProject, RelNode viewNode, RexNode cond) {
+      // Nothing to do
+      return Pair.of(topViewProject, viewNode);
+    }
   }
 
   /** Rule that matches Project on Join. */
   public static class MaterializedViewProjectJoinRule extends MaterializedViewJoinRule {
     public MaterializedViewProjectJoinRule(RelBuilderFactory relBuilderFactory,
-            boolean generateUnionRewriting) {
+            boolean generateUnionRewriting, HepProgram unionRewritingPullProgram,
+            boolean fastBailOut) {
       super(
           operand(Project.class,
               operand(Join.class, any())),
           relBuilderFactory,
           "MaterializedViewJoinRule(Project-Join)",
-          generateUnionRewriting);
+          generateUnionRewriting, unionRewritingPullProgram, fastBailOut);
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
@@ -789,13 +874,14 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
   /** Rule that matches Project on Filter. */
   public static class MaterializedViewProjectFilterRule extends MaterializedViewJoinRule {
     public MaterializedViewProjectFilterRule(RelBuilderFactory relBuilderFactory,
-            boolean generateUnionRewriting) {
+            boolean generateUnionRewriting, HepProgram unionRewritingPullProgram,
+            boolean fastBailOut) {
       super(
           operand(Project.class,
               operand(Filter.class, any())),
           relBuilderFactory,
           "MaterializedViewJoinRule(Project-Filter)",
-          generateUnionRewriting);
+          generateUnionRewriting, unionRewritingPullProgram, fastBailOut);
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
@@ -808,12 +894,13 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
   /** Rule that matches Join. */
   public static class MaterializedViewOnlyJoinRule extends MaterializedViewJoinRule {
     public MaterializedViewOnlyJoinRule(RelBuilderFactory relBuilderFactory,
-            boolean generateUnionRewriting) {
+            boolean generateUnionRewriting, HepProgram unionRewritingPullProgram,
+            boolean fastBailOut) {
       super(
           operand(Join.class, any()),
           relBuilderFactory,
           "MaterializedViewJoinRule(Join)",
-          generateUnionRewriting);
+          generateUnionRewriting, unionRewritingPullProgram, fastBailOut);
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
@@ -825,12 +912,13 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
   /** Rule that matches Filter. */
   public static class MaterializedViewOnlyFilterRule extends MaterializedViewJoinRule {
     public MaterializedViewOnlyFilterRule(RelBuilderFactory relBuilderFactory,
-            boolean generateUnionRewriting) {
+            boolean generateUnionRewriting, HepProgram unionRewritingPullProgram,
+            boolean fastBailOut) {
       super(
           operand(Filter.class, any()),
           relBuilderFactory,
           "MaterializedViewJoinRule(Filter)",
-          generateUnionRewriting);
+          generateUnionRewriting, unionRewritingPullProgram, fastBailOut);
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
@@ -844,11 +932,40 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
   /** Materialized view rewriting for aggregate */
   private abstract static class MaterializedViewAggregateRule
           extends AbstractMaterializedViewRule {
+
+    private static final ImmutableList<TimeUnitRange> SUPPORTED_DATE_TIME_ROLLUP_UNITS =
+        ImmutableList.of(TimeUnitRange.YEAR, TimeUnitRange.QUARTER, TimeUnitRange.MONTH,
+            TimeUnitRange.DAY, TimeUnitRange.HOUR, TimeUnitRange.MINUTE,
+            TimeUnitRange.SECOND, TimeUnitRange.MILLISECOND, TimeUnitRange.MICROSECOND);
+
+    //~ Instance fields --------------------------------------------------------
+
+    /** Instance of rule to push filter through project. */
+    protected final RelOptRule filterProjectTransposeRule;
+
+    /** Instance of rule to push filter through aggregate. */
+    protected final RelOptRule filterAggregateTransposeRule;
+
+    /** Instance of rule to pull up constants into aggregate. */
+    protected final RelOptRule aggregateProjectPullUpConstantsRule;
+
+    /** Instance of rule to merge project operators. */
+    protected final RelOptRule projectMergeRule;
+
+
     /** Creates a MaterializedViewAggregateRule. */
     protected MaterializedViewAggregateRule(RelOptRuleOperand operand,
         RelBuilderFactory relBuilderFactory, String description,
-        boolean generateUnionRewriting) {
-      super(operand, relBuilderFactory, description, generateUnionRewriting);
+        boolean generateUnionRewriting, HepProgram unionRewritingPullProgram) {
+      super(operand, relBuilderFactory, description, generateUnionRewriting,
+          unionRewritingPullProgram, false);
+      this.filterProjectTransposeRule = new FilterProjectTransposeRule(
+          Filter.class, Project.class, true, true, relBuilderFactory);
+      this.filterAggregateTransposeRule = new FilterAggregateTransposeRule(
+          Filter.class, relBuilderFactory, Aggregate.class);
+      this.aggregateProjectPullUpConstantsRule = new AggregateProjectPullUpConstantsRule(
+          Aggregate.class, Filter.class, relBuilderFactory, "AggFilterPullUpConstants");
+      this.projectMergeRule = new ProjectMergeRule(true, relBuilderFactory);
     }
 
     @Override protected boolean isValidPlan(Project topProject, RelNode node,
@@ -924,14 +1041,14 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
         relBuilder.join(JoinRelType.INNER, rexBuilder.makeLiteral(true));
         offset += newRel.getRowType().getFieldCount();
       }
-      // Modify aggregate: add grouping columns and shift aggregation arguments
+      // Modify aggregate: add grouping columns
       ImmutableBitSet.Builder groupSet = ImmutableBitSet.builder();
       groupSet.addAll(aggregateViewNode.getGroupSet());
       groupSet.addAll(
           ImmutableBitSet.range(
               aggregateViewNode.getInput().getRowType().getFieldCount(),
               aggregateViewNode.getInput().getRowType().getFieldCount() + offset));
-      final RelNode newViewNode = aggregateViewNode.copy(
+      final Aggregate newViewNode = aggregateViewNode.copy(
           aggregateViewNode.getTraitSet(), relBuilder.build(), aggregateViewNode.indicator,
           groupSet.build(), null, aggregateViewNode.getAggCallList());
 
@@ -939,9 +1056,19 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
       List<RexNode> nodes = new ArrayList<>();
       List<String> fieldNames = new ArrayList<>();
       if (topViewProject != null) {
-        // Insert existing expressions, then append rest of columns
-        nodes.addAll(topViewProject.getChildExps());
-        fieldNames.addAll(topViewProject.getRowType().getFieldNames());
+        // Insert existing expressions (and shift aggregation arguments),
+        // then append rest of columns
+        Mappings.TargetMapping shiftMapping = Mappings.createShiftMapping(
+            newViewNode.getRowType().getFieldCount(),
+            0, 0, aggregateViewNode.getGroupCount(),
+            newViewNode.getGroupCount(), aggregateViewNode.getGroupCount(),
+            aggregateViewNode.getAggCallList().size());
+        for (int i = 0; i < topViewProject.getChildExps().size(); i++) {
+          nodes.add(
+              topViewProject.getChildExps().get(i).accept(
+                  new RexPermuteInputsShuttle(shiftMapping, newViewNode)));
+          fieldNames.add(topViewProject.getRowType().getFieldNames().get(i));
+        }
         for (int i = aggregateViewNode.getRowType().getFieldCount();
                 i < newViewNode.getRowType().getFieldCount(); i++) {
           int idx = i - aggregateViewNode.getAggCallList().size();
@@ -981,14 +1108,29 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
         BiMap<RelTableRef, RelTableRef> queryToViewTableMapping,
         EquivalenceClasses viewEC, EquivalenceClasses queryEC) {
       Aggregate aggregate = (Aggregate) node;
-      // All columns required by compensating predicates must be contained
-      // in the query.
-      RelNode aggregateInput = aggregate.getInput(0);
-      List<RexNode> queryExprs = extractReferences(rexBuilder, aggregateInput);
 
+      // Our target node is the node below the root, which should have the maximum
+      // number of available expressions in the tree in order to maximize our
+      // number of rewritings.
+      // If the program is available, we execute it to maximize rewriting opportunities.
+      // For instance, a program might pull up all the expressions that are below the
+      // aggregate so we can introduce compensation filters easily. This is important
+      // depending on the planner strategy.
+      RelNode newAggregateInput = aggregate.getInput(0);
+      RelNode target = aggregate.getInput(0);
+      if (unionRewritingPullProgram != null) {
+        final HepPlanner tmpPlanner = new HepPlanner(unionRewritingPullProgram);
+        tmpPlanner.setRoot(newAggregateInput);
+        newAggregateInput = tmpPlanner.findBestExp();
+        target = newAggregateInput.getInput(0);
+      }
+
+      // We need to check that all columns required by compensating predicates
+      // are contained in the query.
+      List<RexNode> queryExprs = extractReferences(rexBuilder, target);
       if (!compensationColumnsEquiPred.isAlwaysTrue()) {
         compensationColumnsEquiPred = rewriteExpression(rexBuilder, mq,
-            aggregateInput, queryExprs, queryToViewTableMapping, queryEC, false,
+            target, queryExprs, queryToViewTableMapping, queryEC, false,
             compensationColumnsEquiPred);
         if (compensationColumnsEquiPred == null) {
           // Skip it
@@ -998,7 +1140,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
       // For the rest, we use the query equivalence classes
       if (!otherCompensationPred.isAlwaysTrue()) {
         otherCompensationPred = rewriteExpression(rexBuilder, mq,
-            aggregateInput, queryExprs, queryToViewTableMapping, viewEC, true,
+            target, queryExprs, queryToViewTableMapping, viewEC, true,
             otherCompensationPred);
         if (otherCompensationPred == null) {
           // Skip it
@@ -1014,10 +1156,17 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
               false));
 
       // Generate query rewriting.
-      relBuilder.push(aggregateInput);
-      relBuilder.filter(simplify.simplify(queryCompensationPred));
-      return aggregate.copy(
-          aggregate.getTraitSet(), ImmutableList.of(relBuilder.build()));
+      RelNode rewrittenPlan = relBuilder
+          .push(target)
+          .filter(simplify.simplify(queryCompensationPred))
+          .build();
+      if (unionRewritingPullProgram != null) {
+        return aggregate.copy(aggregate.getTraitSet(),
+            ImmutableList.of(
+                newAggregateInput.copy(newAggregateInput.getTraitSet(),
+                    ImmutableList.of(rewrittenPlan))));
+      }
+      return aggregate.copy(aggregate.getTraitSet(), ImmutableList.of(rewrittenPlan));
     }
 
     @Override protected RelNode createUnion(RelBuilder relBuilder, RexBuilder rexBuilder,
@@ -1049,9 +1198,15 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
           // Cannot ROLLUP distinct
           return null;
         }
+        SqlAggFunction rollupAgg =
+            getRollup(aggCall.getAggregation());
+        if (rollupAgg == null) {
+          // Cannot rollup this aggregate, bail out
+          return null;
+        }
         aggregateCalls.add(
             relBuilder.aggregateCall(
-                SubstitutionVisitor.getRollup(aggCall.getAggregation()),
+                rollupAgg,
                 aggCall.isDistinct(), aggCall.isApproximate(), null,
                 aggCall.name,
                 rexBuilder.makeInputRef(relBuilder.peek(),
@@ -1079,6 +1234,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
     @Override protected RelNode rewriteView(
         RelBuilder relBuilder,
         RexBuilder rexBuilder,
+        RexSimplify simplify,
         RelMetadataQuery mq,
         MatchModality matchModality,
         boolean unionRewriting,
@@ -1125,24 +1281,39 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
       }
 
       // Create mapping from query columns to view columns
-      Multimap<Integer, Integer> m = generateMapping(rexBuilder, mq, queryAggregate.getInput(),
-          viewAggregate.getInput(), indexes.build(), queryToViewTableMapping, queryEC);
+      List<RexNode> rollupNodes = new ArrayList<>();
+      Multimap<Integer, Integer> m = generateMapping(rexBuilder, simplify, mq,
+          queryAggregate.getInput(), viewAggregate.getInput(), indexes.build(),
+          queryToViewTableMapping, queryEC, rollupNodes);
       if (m == null) {
         // Bail out
         return null;
       }
 
       // We could map all expressions. Create aggregate mapping.
+      int viewAggregateAdditionalFieldCount = rollupNodes.size();
+      int viewInputFieldCount = viewAggregate.getInput().getRowType().getFieldCount();
+      int viewInputDifferenceViewFieldCount =
+          viewAggregate.getRowType().getFieldCount() - viewInputFieldCount;
+      int viewAggregateTotalFieldCount =
+          viewAggregate.getRowType().getFieldCount() + rollupNodes.size();
+      boolean forceRollup = false;
       Mapping aggregateMapping = Mappings.create(MappingType.FUNCTION,
-          queryAggregate.getRowType().getFieldCount(), viewAggregate.getRowType().getFieldCount());
+          queryAggregate.getRowType().getFieldCount(), viewAggregateTotalFieldCount);
       for (int i = 0; i < queryAggregate.getGroupCount(); i++) {
         Collection<Integer> c = m.get(queryAggregate.getGroupSet().nth(i));
         for (int j : c) {
-          int targetIdx = viewAggregate.getGroupSet().indexOf(j);
-          if (targetIdx == -1) {
-            continue;
+          if (j >= viewAggregate.getInput().getRowType().getFieldCount()) {
+            // This is one of the rollup columns
+            aggregateMapping.set(i, j + viewInputDifferenceViewFieldCount);
+            forceRollup = true;
+          } else {
+            int targetIdx = viewAggregate.getGroupSet().indexOf(j);
+            if (targetIdx == -1) {
+              continue;
+            }
+            aggregateMapping.set(i, targetIdx);
           }
-          aggregateMapping.set(i, targetIdx);
           break;
         }
         if (aggregateMapping.getTargetOpt(i) == -1) {
@@ -1167,7 +1338,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
         }
         for (int j = 0; j < viewAggregate.getAggCallList().size(); j++) {
           AggregateCall viewAggCall = viewAggregate.getAggCallList().get(j);
-          if (queryAggCall.getAggregation() != viewAggCall.getAggregation()
+          if (queryAggCall.getAggregation().getKind() != viewAggCall.getAggregation().getKind()
               || queryAggCall.isDistinct() != viewAggCall.isDistinct()
               || queryAggCall.getArgList().size() != viewAggCall.getArgList().size()
               || queryAggCall.getType() != viewAggCall.getType()
@@ -1188,12 +1359,23 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
         }
       }
 
+      // If we reach here, to simplify things, we create an identity topViewProject
+      // if not present
+      if (topViewProject == null) {
+        topViewProject = (Project) relBuilder.push(viewNode)
+            .project(relBuilder.fields(), ImmutableList.<String>of(), true).build();
+      }
+
       // Generate result rewriting
+      List<String> additionalViewExprs = new ArrayList<>();
       Mapping rewritingMapping = null;
-      RelNode result = relBuilder
-          .push(input)
-          .build();
-      if (queryAggregate.getGroupCount() != viewAggregate.getGroupCount()
+      RelNode result = relBuilder.push(input).build();
+      // We create view expressions that will be used in a Project on top of the
+      // view in case we need to rollup the expression
+      final List<RexNode> inputViewExprs = new ArrayList<>();
+      inputViewExprs.addAll(relBuilder.push(result).fields());
+      relBuilder.clear();
+      if (forceRollup || queryAggregate.getGroupCount() != viewAggregate.getGroupCount()
           || matchModality == MatchModality.VIEW_PARTIAL) {
         if (containsDistinctAgg) {
           // Cannot rollup DISTINCT aggregate
@@ -1201,8 +1383,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
         }
         // Target is coarser level of aggregation. Generate an aggregate.
         rewritingMapping = Mappings.create(MappingType.FUNCTION,
-            topViewProject != null ? topViewProject.getRowType().getFieldCount()
-                : viewAggregate.getRowType().getFieldCount(),
+            topViewProject.getRowType().getFieldCount() + viewAggregateAdditionalFieldCount,
             queryAggregate.getRowType().getFieldCount());
         final ImmutableBitSet.Builder groupSetB = ImmutableBitSet.builder();
         for (int i = 0; i < queryAggregate.getGroupCount(); i++) {
@@ -1211,9 +1392,44 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
             // No matching group by column, we bail out
             return null;
           }
-          if (topViewProject != null) {
-            boolean added = false;
-            for (int k = 0; k < topViewProject.getChildExps().size(); k++) {
+          boolean added = false;
+          if (targetIdx >= viewAggregate.getRowType().getFieldCount()) {
+            RexNode targetNode = rollupNodes.get(
+                targetIdx - viewInputFieldCount - viewInputDifferenceViewFieldCount);
+            // We need to rollup this expression
+            final Multimap<String, Integer> exprsLineage = ArrayListMultimap.create();
+            final ImmutableBitSet refs = RelOptUtil.InputFinder.bits(targetNode);
+            for (int childTargetIdx : refs) {
+              added = false;
+              for (int k = 0; k < topViewProject.getChildExps().size() && !added; k++) {
+                RexNode n = topViewProject.getChildExps().get(k);
+                if (!n.isA(SqlKind.INPUT_REF)) {
+                  continue;
+                }
+                int ref = ((RexInputRef) n).getIndex();
+                if (ref == childTargetIdx) {
+                  exprsLineage.put(
+                      new RexInputRef(childTargetIdx, targetNode.getType()).toString(), k);
+                  added = true;
+                }
+              }
+              if (!added) {
+                // No matching column needed for computed expression, bail out
+                return null;
+              }
+            }
+            // We create the new node pointing to the index
+            groupSetB.set(inputViewExprs.size());
+            rewritingMapping.set(inputViewExprs.size(), i);
+            additionalViewExprs.add(
+                new RexInputRef(targetIdx, targetNode.getType()).toString());
+            // We need to create the rollup expression
+            inputViewExprs.add(
+                shuttleReferences(rexBuilder, targetNode, exprsLineage));
+            added = true;
+          } else {
+            // This expression should be referenced directly
+            for (int k = 0; k < topViewProject.getChildExps().size() && !added; k++) {
               RexNode n = topViewProject.getChildExps().get(k);
               if (!n.isA(SqlKind.INPUT_REF)) {
                 continue;
@@ -1223,16 +1439,12 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
                 groupSetB.set(k);
                 rewritingMapping.set(k, i);
                 added = true;
-                break;
               }
             }
-            if (!added) {
-              // No matching group by column, we bail out
-              return null;
-            }
-          } else {
-            groupSetB.set(targetIdx);
-            rewritingMapping.set(targetIdx, i);
+          }
+          if (!added) {
+            // No matching group by column, we bail out
+            return null;
           }
         }
         final ImmutableBitSet groupSet = groupSetB.build();
@@ -1243,49 +1455,47 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
             continue;
           }
           int sourceIdx = queryAggregate.getGroupCount() + i;
-          int targetIdx = aggregateMapping.getTargetOpt(sourceIdx);
-          if (targetIdx == -1) {
+          int targetIdx =
+              aggregateMapping.getTargetOpt(sourceIdx);
+          if (targetIdx < 0) {
             // No matching aggregation column, we bail out
             return null;
           }
           AggregateCall queryAggCall = queryAggregate.getAggCallList().get(i);
-          if (topViewProject != null) {
-            boolean added = false;
-            for (int k = 0; k < topViewProject.getChildExps().size(); k++) {
-              RexNode n = topViewProject.getChildExps().get(k);
-              if (!n.isA(SqlKind.INPUT_REF)) {
-                continue;
-              }
-              int ref = ((RexInputRef) n).getIndex();
-              if (ref == targetIdx) {
-                aggregateCalls.add(
-                    relBuilder.aggregateCall(
-                        SubstitutionVisitor.getRollup(queryAggCall.getAggregation()),
-                        queryAggCall.isDistinct(), queryAggCall.isApproximate(),
-                        null, queryAggCall.name,
-                        rexBuilder.makeInputRef(input, k)));
-                rewritingMapping.set(k, sourceIdx);
-                added = true;
-                break;
-              }
+          boolean added = false;
+          for (int k = 0; k < topViewProject.getChildExps().size() && !added; k++) {
+            RexNode n = topViewProject.getChildExps().get(k);
+            if (!n.isA(SqlKind.INPUT_REF)) {
+              continue;
             }
-            if (!added) {
-              // No matching aggregation column, we bail out
-              return null;
+            int ref = ((RexInputRef) n).getIndex();
+            if (ref == targetIdx) {
+              SqlAggFunction rollupAgg =
+                  getRollup(queryAggCall.getAggregation());
+              if (rollupAgg == null) {
+                // Cannot rollup this aggregate, bail out
+                return null;
+              }
+              aggregateCalls.add(
+                  relBuilder.aggregateCall(
+                      rollupAgg, queryAggCall.isDistinct(), queryAggCall.isApproximate(),
+                      null, queryAggCall.name, rexBuilder.makeInputRef(input, k)));
+              rewritingMapping.set(k, sourceIdx);
+              added = true;
             }
-          } else {
-            aggregateCalls.add(
-                relBuilder.aggregateCall(
-                    SubstitutionVisitor.getRollup(queryAggCall.getAggregation()),
-                    queryAggCall.isDistinct(), queryAggCall.isApproximate(),
-                    null, queryAggCall.name,
-                    rexBuilder.makeInputRef(input, targetIdx)));
-            rewritingMapping.set(targetIdx, sourceIdx);
+          }
+          if (!added) {
+            // No matching aggregation column, we bail out
+            return null;
           }
         }
+        // Create aggregate on top of input
         RelNode prevNode = result;
+        relBuilder.push(result);
+        if (inputViewExprs.size() != result.getRowType().getFieldCount()) {
+          relBuilder.project(inputViewExprs);
+        }
         result = relBuilder
-            .push(result)
             .aggregate(relBuilder.groupKey(groupSet, null), aggregateCalls)
             .build();
         if (prevNode == result && groupSet.cardinality() != result.getRowType().getFieldCount()) {
@@ -1318,65 +1528,252 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
       // and we have introduced already an operator on top of the input node,
       // we use the mapping to resolve the position of the expression in the
       // node.
+      final RelDataType topRowType;
       final List<RexNode> topExprs = new ArrayList<>();
       if (topProject != null && !unionRewriting) {
         topExprs.addAll(topProject.getChildExps());
+        topRowType = topProject.getRowType();
       } else {
         // Add all
         for (int pos = 0; pos < queryAggregate.getRowType().getFieldCount(); pos++) {
           topExprs.add(rexBuilder.makeInputRef(queryAggregate, pos));
         }
+        topRowType = queryAggregate.getRowType();
       }
       // Available in view.
-      final List<String> viewExprs = new ArrayList<>();
-      if (topViewProject != null) {
-        for (int i = 0; i < topViewProject.getChildExps().size(); i++) {
-          viewExprs.add(topViewProject.getChildExps().get(i).toString());
-        }
-      } else {
-        // Add all
-        for (int i = 0; i < viewAggregate.getRowType().getFieldCount(); i++) {
-          viewExprs.add(rexBuilder.makeInputRef(viewAggregate, i).toString());
-        }
+      final Multimap<String, Integer> viewExprs = ArrayListMultimap.create();
+      int numberViewExprs = 0;
+      for (RexNode viewExpr : topViewProject.getChildExps()) {
+        viewExprs.put(viewExpr.toString(), numberViewExprs++);
+      }
+      for (String additionalViewExpr : additionalViewExprs) {
+        viewExprs.put(additionalViewExpr, numberViewExprs++);
       }
       final List<RexNode> rewrittenExprs = new ArrayList<>(topExprs.size());
       for (RexNode expr : topExprs) {
+        // First map through the aggregate
         RexNode rewrittenExpr = shuttleReferences(rexBuilder, expr, aggregateMapping);
         if (rewrittenExpr == null) {
           // Cannot map expression
           return null;
         }
-        int pos = viewExprs.indexOf(rewrittenExpr.toString());
-        if (pos == -1) {
+        // Next map through the last project
+        rewrittenExpr =
+            shuttleReferences(rexBuilder, rewrittenExpr, viewExprs, result, rewritingMapping);
+        if (rewrittenExpr == null) {
           // Cannot map expression
           return null;
         }
-        if (rewritingMapping != null) {
-          pos = rewritingMapping.getTargetOpt(pos);
-          if (pos == -1) {
-            // Cannot map expression
-            return null;
-          }
-        }
-        rewrittenExprs.add(rexBuilder.makeInputRef(result, pos));
+        rewrittenExprs.add(rewrittenExpr);
       }
       return relBuilder
           .push(result)
           .project(rewrittenExprs)
+          .convert(topRowType, false)
           .build();
     }
+
+    /**
+     * Mapping from node expressions to target expressions.
+     *
+     * <p>If any of the expressions cannot be mapped, we return null.
+     */
+    protected Multimap<Integer, Integer> generateMapping(
+        RexBuilder rexBuilder,
+        RexSimplify simplify,
+        RelMetadataQuery mq,
+        RelNode node,
+        RelNode target,
+        ImmutableBitSet positions,
+        BiMap<RelTableRef, RelTableRef> tableMapping,
+        EquivalenceClasses sourceEC,
+        List<RexNode> additionalExprs) {
+      Preconditions.checkArgument(additionalExprs.isEmpty());
+      Multimap<Integer, Integer> m = ArrayListMultimap.create();
+      Map<RexTableInputRef, Set<RexTableInputRef>> equivalenceClassesMap =
+          sourceEC.getEquivalenceClassesMap();
+      Multimap<String, Integer> exprsLineage = ArrayListMultimap.create();
+      List<RexNode> timestampExprs = new ArrayList<>();
+      for (int i = 0; i < target.getRowType().getFieldCount(); i++) {
+        Set<RexNode> s = mq.getExpressionLineage(target, rexBuilder.makeInputRef(target, i));
+        if (s == null) {
+          // Bail out
+          continue;
+        }
+        // We only support project - filter - join, thus it should map to
+        // a single expression
+        assert s.size() == 1;
+        // Rewrite expr to be expressed on query tables
+        RexNode expr = RexUtil.swapTableColumnReferences(
+            rexBuilder,
+            simplify.simplify(s.iterator().next()),
+            tableMapping.inverse(),
+            equivalenceClassesMap);
+        exprsLineage.put(expr.toString(), i);
+        SqlTypeName sqlTypeName = expr.getType().getSqlTypeName();
+        if (sqlTypeName == SqlTypeName.TIMESTAMP
+            || sqlTypeName == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+          timestampExprs.add(expr);
+        }
+      }
+
+      // If this is a column of TIMESTAMP (WITH LOCAL TIME ZONE)
+      // type, we add the possible rollup columns too.
+      // This way we will be able to match FLOOR(ts to HOUR) to
+      // FLOOR(ts to DAY) via FLOOR(FLOOR(ts to HOUR) to DAY)
+      for (RexNode timestampExpr : timestampExprs) {
+        for (TimeUnitRange value : SUPPORTED_DATE_TIME_ROLLUP_UNITS) {
+          // CEIL
+          RexNode ceilExpr =
+              rexBuilder.makeCall(getCeilSqlFunction(value),
+                  timestampExpr, rexBuilder.makeFlag(value));
+          // References self-row
+          RexNode rewrittenCeilExpr =
+              shuttleReferences(rexBuilder, ceilExpr, exprsLineage);
+          if (rewrittenCeilExpr != null) {
+            // We add the CEIL expression to the additional expressions, replacing the child
+            // expression by the position that it references
+            additionalExprs.add(rewrittenCeilExpr);
+            // Then we simplify the expression and we add it to the expressions lineage so we
+            // can try to find a match
+            exprsLineage.put(simplify.simplify(ceilExpr).toString(),
+                target.getRowType().getFieldCount() + additionalExprs.size() - 1);
+          }
+          // FLOOR
+          RexNode floorExpr =
+              rexBuilder.makeCall(getFloorSqlFunction(value),
+                  timestampExpr, rexBuilder.makeFlag(value));
+          // References self-row
+          RexNode rewrittenFloorExpr =
+              shuttleReferences(rexBuilder, floorExpr, exprsLineage);
+          if (rewrittenFloorExpr != null) {
+            // We add the FLOOR expression to the additional expressions, replacing the child
+            // expression by the position that it references
+            additionalExprs.add(rewrittenFloorExpr);
+            // Then we simplify the expression and we add it to the expressions lineage so we
+            // can try to find a match
+            exprsLineage.put(simplify.simplify(floorExpr).toString(),
+                target.getRowType().getFieldCount() + additionalExprs.size() - 1);
+          }
+        }
+      }
+
+      for (int i : positions) {
+        Set<RexNode> s = mq.getExpressionLineage(node, rexBuilder.makeInputRef(node, i));
+        if (s == null) {
+          // Bail out
+          return null;
+        }
+        // We only support project - filter - join, thus it should map to
+        // a single expression
+        assert s.size() == 1;
+        // Rewrite expr to be expressed on query tables
+        RexNode targetExpr = RexUtil.swapColumnReferences(
+            rexBuilder, simplify.simplify(s.iterator().next()),
+            equivalenceClassesMap);
+        Collection<Integer> c = exprsLineage.get(targetExpr.toString());
+        if (!c.isEmpty()) {
+          for (Integer j : c) {
+            m.put(i, j);
+          }
+        } else {
+          // If we did not find the expression, try to navigate it
+          RexNode rewrittenTargetExpr =
+              shuttleReferences(rexBuilder, targetExpr, exprsLineage);
+          if (rewrittenTargetExpr == null) {
+            // Some expressions were not present
+            return null;
+          }
+          m.put(i, target.getRowType().getFieldCount() + additionalExprs.size());
+          additionalExprs.add(rewrittenTargetExpr);
+        }
+      }
+      return m;
+    }
+
+    /**
+     * Get ceil function datetime.
+     */
+    protected SqlFunction getCeilSqlFunction(TimeUnitRange flag) {
+      return SqlStdOperatorTable.CEIL;
+    }
+
+    /**
+     * Get floor function datetime.
+     */
+    protected SqlFunction getFloorSqlFunction(TimeUnitRange flag) {
+      return SqlStdOperatorTable.FLOOR;
+    }
+
+    /**
+     * Get rollup aggregation function.
+     */
+    protected SqlAggFunction getRollup(SqlAggFunction aggregation) {
+      if (aggregation == SqlStdOperatorTable.SUM
+          || aggregation == SqlStdOperatorTable.MIN
+          || aggregation == SqlStdOperatorTable.MAX
+          || aggregation == SqlStdOperatorTable.SUM0) {
+        return aggregation;
+      } else if (aggregation == SqlStdOperatorTable.COUNT) {
+        return SqlStdOperatorTable.SUM0;
+      } else {
+        return null;
+      }
+    }
+
+    @Override public Pair<RelNode, RelNode> pushFilterToOriginalViewPlan(RelBuilder builder,
+        RelNode topViewProject, RelNode viewNode, RexNode cond) {
+      // We add (and push) the filter to the view plan before triggering the rewriting.
+      // This is useful in case some of the columns can be folded to same value after
+      // filter is added.
+      HepProgramBuilder pushFiltersProgram = new HepProgramBuilder();
+      if (topViewProject != null) {
+        pushFiltersProgram.addRuleInstance(filterProjectTransposeRule);
+      }
+      pushFiltersProgram
+          .addRuleInstance(this.filterAggregateTransposeRule)
+          .addRuleInstance(this.aggregateProjectPullUpConstantsRule)
+          .addRuleInstance(this.projectMergeRule);
+      final HepPlanner tmpPlanner = new HepPlanner(pushFiltersProgram.build());
+      // Now that the planner is created, push the node
+      RelNode topNode = builder
+          .push(topViewProject != null ? topViewProject : viewNode)
+          .filter(cond).build();
+      tmpPlanner.setRoot(topNode);
+      topNode = tmpPlanner.findBestExp();
+      RelNode resultTopViewProject = null;
+      RelNode resultViewNode = null;
+      while (topNode != null) {
+        if (topNode instanceof Project) {
+          if (resultTopViewProject != null) {
+            // Both projects could not be merged, we will bail out
+            return Pair.of(topViewProject, viewNode);
+          }
+          resultTopViewProject = topNode;
+          topNode = topNode.getInput(0);
+        } else if (topNode instanceof Aggregate) {
+          resultViewNode = topNode;
+          topNode = null;
+        } else {
+          // We move to the child
+          topNode = topNode.getInput(0);
+        }
+      }
+      return Pair.of(resultTopViewProject, resultViewNode);
+    }
   }
 
   /** Rule that matches Project on Aggregate. */
   public static class MaterializedViewProjectAggregateRule extends MaterializedViewAggregateRule {
+
     public MaterializedViewProjectAggregateRule(RelBuilderFactory relBuilderFactory,
-            boolean generateUnionRewriting) {
+            boolean generateUnionRewriting, HepProgram unionRewritingPullProgram) {
       super(
           operand(Project.class,
               operand(Aggregate.class, any())),
           relBuilderFactory,
           "MaterializedViewAggregateRule(Project-Aggregate)",
-          generateUnionRewriting);
+          generateUnionRewriting, unionRewritingPullProgram);
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
@@ -1388,13 +1785,14 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
 
   /** Rule that matches Aggregate. */
   public static class MaterializedViewOnlyAggregateRule extends MaterializedViewAggregateRule {
+
     public MaterializedViewOnlyAggregateRule(RelBuilderFactory relBuilderFactory,
-            boolean generateUnionRewriting) {
+            boolean generateUnionRewriting, HepProgram unionRewritingPullProgram) {
       super(
           operand(Aggregate.class, any()),
           relBuilderFactory,
           "MaterializedViewAggregateRule(Aggregate)",
-          generateUnionRewriting);
+          generateUnionRewriting, unionRewritingPullProgram);
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
@@ -1958,66 +2356,6 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
   }
 
   /**
-   * Mapping from node expressions to target expressions.
-   *
-   * <p>If any of the expressions cannot be mapped, we return null.
-   */
-  private static Multimap<Integer, Integer> generateMapping(
-      RexBuilder rexBuilder,
-      RelMetadataQuery mq,
-      RelNode node,
-      RelNode target,
-      ImmutableBitSet positions,
-      BiMap<RelTableRef, RelTableRef> tableMapping,
-      EquivalenceClasses sourceEC) {
-    Map<RexTableInputRef, Set<RexTableInputRef>> equivalenceClassesMap =
-            sourceEC.getEquivalenceClassesMap();
-    Multimap<String, Integer> exprsLineage = ArrayListMultimap.create();
-    for (int i = 0; i < target.getRowType().getFieldCount(); i++) {
-      Set<RexNode> s = mq.getExpressionLineage(target, rexBuilder.makeInputRef(target, i));
-      if (s == null) {
-        // Bail out
-        continue;
-      }
-      // We only support project - filter - join, thus it should map to
-      // a single expression
-      assert s.size() == 1;
-      // Rewrite expr to be expressed on query tables
-      exprsLineage.put(
-          RexUtil.swapTableColumnReferences(
-              rexBuilder,
-              s.iterator().next(),
-              tableMapping.inverse(),
-              equivalenceClassesMap).toString(),
-          i);
-    }
-
-    Multimap<Integer, Integer> m = ArrayListMultimap.create();
-    for (int i : positions) {
-      Set<RexNode> s = mq.getExpressionLineage(node, rexBuilder.makeInputRef(node, i));
-      if (s == null) {
-        // Bail out
-        return null;
-      }
-      // We only support project - filter - join, thus it should map to
-      // a single expression
-      assert s.size() == 1;
-      // Rewrite expr to be expressed on query tables
-      Collection<Integer> c = exprsLineage.get(
-          RexUtil.swapColumnReferences(
-              rexBuilder, s.iterator().next(), equivalenceClassesMap).toString());
-      if (c == null) {
-        // Bail out
-        return null;
-      }
-      for (Integer j : c) {
-        m.put(i, j);
-      }
-    }
-    return m;
-  }
-
-  /**
    * Given the input expression, it will replace (sub)expressions when possible
    * using the content of the mapping. In particular, the mapping contains the
    * digest of the expression and the index that the replacement input ref should
@@ -2088,6 +2426,95 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
   }
 
   /**
+   * Replaces all the possible subexpressions by input references
+   * to the input node.
+   */
+  private static RexNode shuttleReferences(final RexBuilder rexBuilder,
+      final RexNode expr, final Multimap<String, Integer> exprsLineage) {
+    return shuttleReferences(rexBuilder, expr,
+        exprsLineage, null, null);
+  }
+
+  /**
+   * Replaces all the possible subexpressions by input references
+   * to the input node. If available, it uses the rewriting mapping
+   * to change the position to reference. Takes the reference type
+   * from the input node.
+   */
+  private static RexNode shuttleReferences(final RexBuilder rexBuilder,
+      final RexNode expr, final Multimap<String, Integer> exprsLineage,
+      final RelNode node, final Mapping rewritingMapping) {
+    try {
+      RexShuttle visitor =
+          new RexShuttle() {
+            @Override public RexNode visitTableInputRef(RexTableInputRef ref) {
+              Collection<Integer> c = exprsLineage.get(ref.toString());
+              if (c.isEmpty()) {
+                // Cannot map expression
+                throw Util.FoundOne.NULL;
+              }
+              int pos = c.iterator().next();
+              if (rewritingMapping != null) {
+                pos = rewritingMapping.getTargetOpt(pos);
+                if (pos == -1) {
+                  // Cannot map expression
+                  throw Util.FoundOne.NULL;
+                }
+              }
+              if (node != null) {
+                return rexBuilder.makeInputRef(node, pos);
+              }
+              return rexBuilder.makeInputRef(ref.getType(), pos);
+            }
+
+            @Override public RexNode visitInputRef(RexInputRef inputRef) {
+              Collection<Integer> c = exprsLineage.get(inputRef.toString());
+              if (c.isEmpty()) {
+                // Cannot map expression
+                throw Util.FoundOne.NULL;
+              }
+              int pos = c.iterator().next();
+              if (rewritingMapping != null) {
+                pos = rewritingMapping.getTargetOpt(pos);
+                if (pos == -1) {
+                  // Cannot map expression
+                  throw Util.FoundOne.NULL;
+                }
+              }
+              if (node != null) {
+                return rexBuilder.makeInputRef(node, pos);
+              }
+              return rexBuilder.makeInputRef(inputRef.getType(), pos);
+            }
+
+            @Override public RexNode visitCall(final RexCall call) {
+              Collection<Integer> c = exprsLineage.get(call.toString());
+              if (c.isEmpty()) {
+                // Cannot map expression
+                return super.visitCall(call);
+              }
+              int pos = c.iterator().next();
+              if (rewritingMapping != null) {
+                pos = rewritingMapping.getTargetOpt(pos);
+                if (pos == -1) {
+                  // Cannot map expression
+                  return super.visitCall(call);
+                }
+              }
+              if (node != null) {
+                return rexBuilder.makeInputRef(node, pos);
+              }
+              return rexBuilder.makeInputRef(call.getType(), pos);
+            }
+          };
+      return visitor.apply(expr);
+    } catch (Util.FoundOne ex) {
+      Util.swallow(ex, null);
+      return null;
+    }
+  }
+
+  /**
    * Class representing an equivalence class, i.e., a set of equivalent columns
    */
   private static class EquivalenceClasses {

http://git-wip-us.apache.org/repos/asf/calcite/blob/aa25dcbe/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectPullUpConstantsRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectPullUpConstantsRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectPullUpConstantsRule.java
index 75af27d..9074e81 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectPullUpConstantsRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectPullUpConstantsRule.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexInputRef;
@@ -166,7 +167,13 @@ public class AggregateProjectPullUpConstantsRule extends RelOptRule {
         expr = relBuilder.field(i - map.size());
       } else if (map.containsKey(i)) {
         // Re-generate the constant expression in the project.
-        expr = map.get(i);
+        RelDataType originalType =
+            aggregate.getRowType().getFieldList().get(projects.size()).getType();
+        if (!originalType.equals(map.get(i).getType())) {
+          expr = rexBuilder.makeCast(originalType, map.get(i), true);
+        } else {
+          expr = map.get(i);
+        }
       } else {
         // Project the aggregation expression, in its original
         // position.

http://git-wip-us.apache.org/repos/asf/calcite/blob/aa25dcbe/core/src/main/java/org/apache/calcite/rex/RexCopier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexCopier.java b/core/src/main/java/org/apache/calcite/rex/RexCopier.java
index 3563cdd..7b66086 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexCopier.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexCopier.java
@@ -84,7 +84,8 @@ class RexCopier extends RexShuttle {
   }
 
   public RexNode visitLiteral(RexLiteral literal) {
-    return new RexLiteral(literal.getValue(), copy(literal.getType()),
+    // Get the value as is
+    return new RexLiteral(RexLiteral.value(literal), copy(literal.getType()),
         literal.getTypeName());
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/aa25dcbe/core/src/test/java/org/apache/calcite/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/JdbcTest.java b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
index ec5118f..548d735 100644
--- a/core/src/test/java/org/apache/calcite/test/JdbcTest.java
+++ b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
@@ -6900,6 +6900,26 @@ public class JdbcTest {
     }
   }
 
+  public static class Event {
+    public final int eventid;
+    public final Timestamp ts;
+
+    public Event(int eventid, Timestamp ts) {
+      this.eventid = eventid;
+      this.ts = ts;
+    }
+
+    @Override public String toString() {
+      return "Event [eventid: " + eventid + ", ts: " + ts + "]";
+    }
+
+    @Override public boolean equals(Object obj) {
+      return obj == this
+          || obj instanceof Event
+          && eventid == ((Event) obj).eventid;
+    }
+  }
+
   public static class FoodmartSchema {
     public final SalesFact[] sales_fact_1997 = {
       new SalesFact(100, 10),

http://git-wip-us.apache.org/repos/asf/calcite/blob/aa25dcbe/core/src/test/java/org/apache/calcite/test/MaterializationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MaterializationTest.java b/core/src/test/java/org/apache/calcite/test/MaterializationTest.java
index 1ff8393..6c52f93 100644
--- a/core/src/test/java/org/apache/calcite/test/MaterializationTest.java
+++ b/core/src/test/java/org/apache/calcite/test/MaterializationTest.java
@@ -45,6 +45,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.test.JdbcTest.Department;
 import org.apache.calcite.test.JdbcTest.Dependent;
 import org.apache.calcite.test.JdbcTest.Employee;
+import org.apache.calcite.test.JdbcTest.Event;
 import org.apache.calcite.test.JdbcTest.Location;
 import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.RuleSets;
@@ -62,6 +63,7 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 import java.sql.ResultSet;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -546,7 +548,7 @@ public class MaterializationTest {
         HR_FKUK_MODEL,
         CalciteAssert.checkResultContains(
             "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], "
-                + "expr#3=[+($t1, $t2)], C=[$t3], deptno=[$t0])\n"
+                + "expr#3=[+($t1, $t2)], $f0=[$t3], deptno=[$t0])\n"
                 + "  EnumerableAggregate(group=[{1}], agg#0=[$SUM0($2)])\n"
                 + "    EnumerableTableScan(table=[[hr, m0]])"));
   }
@@ -1143,8 +1145,8 @@ public class MaterializationTest {
             + "from \"emps\" where \"deptno\" > 10 group by \"deptno\"",
         HR_FKUK_MODEL,
         CalciteAssert.checkResultContains(
-            "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[+($t1, $t2)], "
-                + "deptno=[$t0], S=[$t3])\n"
+            "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[+($t1, $t2)],"
+                + " deptno=[$t0], $f1=[$t3])\n"
                 + "  EnumerableAggregate(group=[{1}], agg#0=[$SUM0($3)])\n"
                 + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[10], expr#5=[>($t1, $t4)], "
                 + "proj#0..3=[{exprs}], $condition=[$t5])\n"
@@ -1169,10 +1171,10 @@ public class MaterializationTest {
         HR_FKUK_MODEL,
         CalciteAssert.checkResultContains(
             "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[+($t0, $t2)], "
-                + "expr#4=[+($t1, $t2)], EXPR$0=[$t3], S=[$t4])\n"
+                + "expr#4=[+($t1, $t2)], $f0=[$t3], $f1=[$t4])\n"
                 + "  EnumerableAggregate(group=[{1}], agg#0=[$SUM0($3)])\n"
-                + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[10], expr#5=[>($t1, $t4)], "
-                + "proj#0..3=[{exprs}], $condition=[$t5])\n"
+                + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[10], "
+                + "expr#5=[>($t1, $t4)], proj#0..3=[{exprs}], $condition=[$t5])\n"
                 + "      EnumerableTableScan(table=[[hr, m0]])"));
   }
 
@@ -1187,6 +1189,97 @@ public class MaterializationTest {
             + "from \"emps\" where \"deptno\" > 10 group by \"deptno\"");
   }
 
+  @Test public void testAggregateMaterializationAggregateFuncs9() {
+    checkMaterialize(
+        "select \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to month), count(*) + 1 as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to month)",
+        "select floor(cast('1997-01-20 12:34:56' as timestamp) to year), sum(\"empid\") as s\n"
+            + "from \"emps\" group by floor(cast('1997-01-20 12:34:56' as timestamp) to year)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs10() {
+    checkMaterialize(
+        "select \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to month), count(*) + 1 as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to month)",
+        "select floor(cast('1997-01-20 12:34:56' as timestamp) to year), sum(\"empid\") + 1 as s\n"
+            + "from \"emps\" group by floor(cast('1997-01-20 12:34:56' as timestamp) to year)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs11() {
+    checkMaterialize(
+        "select \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to second), count(*) + 1 as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to second)",
+        "select floor(cast('1997-01-20 12:34:56' as timestamp) to minute), sum(\"empid\") as s\n"
+            + "from \"emps\" group by floor(cast('1997-01-20 12:34:56' as timestamp) to minute)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs12() {
+    checkMaterialize(
+        "select \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to second), count(*) + 1 as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to second)",
+        "select floor(cast('1997-01-20 12:34:56' as timestamp) to month), sum(\"empid\") as s\n"
+            + "from \"emps\" group by floor(cast('1997-01-20 12:34:56' as timestamp) to month)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs13() {
+    checkMaterialize(
+        "select \"empid\", cast('1997-01-20 12:34:56' as timestamp), count(*) + 1 as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", cast('1997-01-20 12:34:56' as timestamp)",
+        "select floor(cast('1997-01-20 12:34:56' as timestamp) to year), sum(\"empid\") as s\n"
+            + "from \"emps\" group by floor(cast('1997-01-20 12:34:56' as timestamp) to year)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs14() {
+    checkMaterialize(
+        "select \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to month), count(*) + 1 as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", floor(cast('1997-01-20 12:34:56' as timestamp) to month)",
+        "select floor(cast('1997-01-20 12:34:56' as timestamp) to hour), sum(\"empid\") as s\n"
+            + "from \"emps\" group by floor(cast('1997-01-20 12:34:56' as timestamp) to hour)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs15() {
+    checkMaterialize(
+        "select \"eventid\", floor(cast(\"ts\" as timestamp) to second), count(*) + 1 as c, sum(\"eventid\") as s\n"
+            + "from \"events\" group by \"eventid\", floor(cast(\"ts\" as timestamp) to second)",
+        "select floor(cast(\"ts\" as timestamp) to minute), sum(\"eventid\") as s\n"
+            + "from \"events\" group by floor(cast(\"ts\" as timestamp) to minute)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs16() {
+    checkMaterialize(
+        "select \"eventid\", cast(\"ts\" as timestamp), count(*) + 1 as c, sum(\"eventid\") as s\n"
+            + "from \"events\" group by \"eventid\", cast(\"ts\" as timestamp)",
+        "select floor(cast(\"ts\" as timestamp) to year), sum(\"eventid\") as s\n"
+            + "from \"events\" group by floor(cast(\"ts\" as timestamp) to year)");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs17() {
+    checkMaterialize(
+        "select \"eventid\", floor(cast(\"ts\" as timestamp) to month), count(*) + 1 as c, sum(\"eventid\") as s\n"
+            + "from \"events\" group by \"eventid\", floor(cast(\"ts\" as timestamp) to month)",
+        "select floor(cast(\"ts\" as timestamp) to hour), sum(\"eventid\") as s\n"
+            + "from \"events\" group by floor(cast(\"ts\" as timestamp) to hour)",
+        HR_FKUK_MODEL,
+        CalciteAssert.checkResultContains(
+            "EnumerableTableScan(table=[[hr, events]])"));
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs18() {
+    checkMaterialize(
+        "select \"empid\", \"deptno\", count(*) + 1 as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", \"deptno\"",
+        "select \"empid\"*\"deptno\", sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\"*\"deptno\"");
+  }
+
+  @Test public void testAggregateMaterializationAggregateFuncs19() {
+    checkMaterialize(
+        "select \"empid\", \"deptno\", count(*) as c, sum(\"empid\") as s\n"
+            + "from \"emps\" group by \"empid\", \"deptno\"",
+        "select \"empid\" + 10, count(*) + 1 as c\n"
+            + "from \"emps\" group by \"empid\" + 10");
+  }
+
   @Test public void testJoinAggregateMaterializationNoAggregateFuncs1() {
     checkMaterialize(
         "select \"empid\", \"depts\".\"deptno\" from \"emps\"\n"
@@ -2245,6 +2338,12 @@ public class MaterializationTest {
         new Dependent(10, "San Francisco"),
         new Dependent(20, "San Diego"),
     };
+    public final Event[] events = {
+        new Event(100, new Timestamp(0)),
+        new Event(200, new Timestamp(0)),
+        new Event(150, new Timestamp(0)),
+        new Event(110, null),
+    };
 
     public final RelReferentialConstraint rcs0 =
         RelReferentialConstraintImpl.of(