You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2018/11/08 18:00:59 UTC

[4/6] calcite git commit: [CALCITE-2224] Support WITHIN GROUP clause for aggregate functions (Hongze Zhang)

[CALCITE-2224] Support WITHIN GROUP clause for aggregate functions (Hongze Zhang)

Close apache/calcite#871


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/7bc9f140
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/7bc9f140
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/7bc9f140

Branch: refs/heads/master
Commit: 7bc9f14032b7cf0761c0b2eefdb6bb588047ec8e
Parents: 4cc4613
Author: hongzezhang <ho...@tencent.com>
Authored: Thu Sep 27 17:55:02 2018 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Nov 8 10:00:28 2018 -0800

----------------------------------------------------------------------
 core/src/main/codegen/templates/Parser.jj       |  15 +-
 .../calcite/adapter/enumerable/AggImpState.java |   1 +
 .../enumerable/AggregateLambdaFactory.java      |  48 +++++
 .../adapter/enumerable/EnumerableAggregate.java | 195 ++++++++++++++-----
 .../OrderedAggregateLambdaFactory.java          | 105 ++++++++++
 .../SequencedAdderAggregateLambdaFactory.java   |  88 +++++++++
 .../adapter/enumerable/SourceSorter.java        |  60 ++++++
 .../org/apache/calcite/plan/RelOptUtil.java     |  10 +-
 .../calcite/plan/SubstitutionVisitor.java       |  13 +-
 .../calcite/prepare/CalciteCatalogReader.java   |   4 +-
 .../org/apache/calcite/rel/RelCollations.java   |  20 ++
 .../apache/calcite/rel/core/AggregateCall.java  | 112 ++++++++---
 .../org/apache/calcite/rel/core/Window.java     |   1 +
 .../calcite/rel/externalize/RelJsonReader.java  |   5 +-
 .../calcite/rel/rel2sql/SqlImplementor.java     |  51 +++--
 .../rel/rules/AbstractMaterializedViewRule.java |   2 +
 .../AggregateExpandDistinctAggregatesRule.java  |  27 +--
 .../rel/rules/AggregateExtractProjectRule.java  |   2 +-
 .../rel/rules/AggregateFilterTransposeRule.java |   1 +
 .../rel/rules/AggregateProjectMergeRule.java    |  59 +++---
 .../rel/rules/AggregateReduceFunctionsRule.java |  12 +-
 .../rel/rules/AggregateStarTableRule.java       |   3 +-
 .../rel/rules/AggregateUnionTransposeRule.java  |   8 +-
 .../calcite/rel/rules/SubQueryRemoveRule.java   |   3 +-
 .../java/org/apache/calcite/rex/RexBuilder.java |   3 +-
 .../apache/calcite/runtime/CalciteResource.java |  12 ++
 .../org/apache/calcite/sql/SqlAggFunction.java  |  59 +++++-
 .../apache/calcite/sql/SqlFilterOperator.java   |  47 +++--
 .../java/org/apache/calcite/sql/SqlKind.java    |   5 +
 .../org/apache/calcite/sql/SqlRankFunction.java |   3 +-
 .../calcite/sql/SqlSplittableAggFunction.java   |  16 +-
 .../calcite/sql/SqlWithinGroupOperator.java     |  85 ++++++++
 .../sql/fun/SqlAbstractGroupFunction.java       |   3 +-
 .../calcite/sql/fun/SqlAnyValueAggFunction.java |   4 +-
 .../calcite/sql/fun/SqlAvgAggFunction.java      |   7 +-
 .../calcite/sql/fun/SqlCountAggFunction.java    |   7 +-
 .../calcite/sql/fun/SqlCovarAggFunction.java    |   4 +-
 .../sql/fun/SqlFirstLastValueAggFunction.java   |   4 +-
 .../sql/fun/SqlHistogramAggFunction.java        |   4 +-
 .../calcite/sql/fun/SqlLeadLagAggFunction.java  |   4 +-
 .../calcite/sql/fun/SqlMinMaxAggFunction.java   |   4 +-
 .../calcite/sql/fun/SqlNthValueAggFunction.java |   3 +-
 .../calcite/sql/fun/SqlNtileAggFunction.java    |   4 +-
 .../sql/fun/SqlSingleValueAggFunction.java      |   4 +-
 .../calcite/sql/fun/SqlStdOperatorTable.java    |  11 +-
 .../calcite/sql/fun/SqlSumAggFunction.java      |   4 +-
 .../sql/fun/SqlSumEmptyIsZeroAggFunction.java   |   4 +-
 .../apache/calcite/sql/validate/AggChecker.java |   4 +
 .../sql/validate/SqlUserDefinedAggFunction.java |   7 +-
 .../calcite/sql/validate/SqlValidator.java      |   8 +-
 .../calcite/sql/validate/SqlValidatorImpl.java  |  39 +++-
 .../apache/calcite/sql2rel/RelFieldTrimmer.java |   7 +-
 .../calcite/sql2rel/SqlToRelConverter.java      |  67 ++++++-
 .../org/apache/calcite/tools/RelBuilder.java    |  65 +++++--
 .../org/apache/calcite/util/BuiltInMethod.java  |  20 +-
 .../org/apache/calcite/util/Optionality.java    |  41 ++++
 .../calcite/runtime/CalciteResource.properties  |   4 +
 .../org/apache/calcite/plan/RelWriterTest.java  |   5 +-
 .../plan/volcano/TraitPropagationTest.java      |   3 +-
 .../rel/rel2sql/RelToSqlConverterTest.java      |  51 ++++-
 .../calcite/sql/parser/SqlParserTest.java       |  58 ++++++
 .../calcite/sql/test/SqlOperatorBaseTest.java   |   2 +
 .../java/org/apache/calcite/test/JdbcTest.java  |  75 +++++++
 .../apache/calcite/test/RelMetadataTest.java    |   2 +-
 .../calcite/test/SqlToRelConverterTest.java     |  28 +++
 .../apache/calcite/test/SqlValidatorTest.java   |  35 ++++
 .../org/apache/calcite/tools/PlannerTest.java   |   4 +-
 .../calcite/test/SqlToRelConverterTest.xml      |  38 ++++
 core/src/test/resources/sql/agg.iq              | 123 ++++++++++++
 .../calcite/adapter/druid/DruidRules.java       |  11 +-
 pom.xml                                         |   2 +-
 site/_docs/algebra.md                           |  26 +--
 site/_docs/reference.md                         |  12 +-
 73 files changed, 1612 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/codegen/templates/Parser.jj
----------------------------------------------------------------------
diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index bcc9e56..7bfc02c 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -4976,6 +4976,8 @@ SqlNode NamedFunctionCall() :
     final SqlNode filter;
     final SqlNode over;
     SqlLiteral quantifier = null;
+    SqlNodeList orderList = null;
+    final Span withinGroupSpan;
 }
 {
     (
@@ -5004,10 +5006,19 @@ SqlNode NamedFunctionCall() :
         }
     )
     {
-        call = createCall(qualifiedName, s.end(this), funcType, quantifier,
-            args);
+        call = createCall(qualifiedName, s.end(this), funcType, quantifier, args);
     }
     [
+        <WITHIN> { withinGroupSpan = span(); }
+        <GROUP>
+        <LPAREN>
+        orderList = OrderBy(true)
+        <RPAREN> {
+            call = SqlStdOperatorTable.WITHIN_GROUP.createCall(
+                withinGroupSpan.end(this), call, orderList);
+        }
+    ]
+    [
         <FILTER> { filterSpan = span(); }
         <LPAREN>
         <WHERE>

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/adapter/enumerable/AggImpState.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/AggImpState.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/AggImpState.java
index b100cec..aac5277 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/AggImpState.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/AggImpState.java
@@ -31,6 +31,7 @@ public class AggImpState {
   public AggContext context;
   public Expression result;
   public List<Expression> state;
+  public Expression accumulatorAdder;
 
   public AggImpState(int aggIdx, AggregateCall call, boolean windowContext) {
     this.aggIdx = aggIdx;

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/adapter/enumerable/AggregateLambdaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/AggregateLambdaFactory.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/AggregateLambdaFactory.java
new file mode 100644
index 0000000..c5f87b4
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/AggregateLambdaFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.function.Function0;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.function.Function2;
+
+/**
+ * Generates lambda functions used in {@link EnumerableAggregate}.
+ *
+ * <p>This interface allows a implicit accumulator type variation.
+ * ({@code OAccumulate} {@literal ->} {@code TAccumulate})
+ *
+ * @param <TSource> Type of the enumerable input source
+ * @param <TOrigAccumulate> Type of the original accumulator
+ * @param <TAccumulate> Type of the varied accumulator
+ * @param <TResult> Type of the enumerable output result
+ * @param <TKey> Type of the group-by key
+ */
+public interface AggregateLambdaFactory<TSource, TOrigAccumulate, TAccumulate,
+    TResult, TKey> {
+  Function0<TAccumulate> accumulatorInitializer();
+
+  Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder();
+
+  Function1<TAccumulate, TResult> singleGroupResultSelector(
+      Function1<TOrigAccumulate, TResult> resultSelector);
+
+  Function2<TKey, TAccumulate, TResult> resultSelector(
+      Function2<TKey, TOrigAccumulate, TResult> resultSelector);
+}
+
+// End AggregateLambdaFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
index 49af4ba..1caf4e3 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
@@ -33,6 +33,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
 import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -52,6 +53,7 @@ import com.google.common.collect.ImmutableList;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 /** Implementation of {@link org.apache.calcite.rel.core.Aggregate} in
@@ -231,37 +233,10 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
         PhysTypeImpl.of(typeFactory,
             typeFactory.createSyntheticType(aggStateTypes));
 
-
-    if (accPhysType.getJavaRowType() instanceof JavaTypeFactoryImpl.SyntheticRecordType) {
-      // We have to initialize the SyntheticRecordType instance this way, to avoid using
-      // class constructor with too many parameters.
-      JavaTypeFactoryImpl.SyntheticRecordType synType =
-          (JavaTypeFactoryImpl.SyntheticRecordType)
-          accPhysType.getJavaRowType();
-      final ParameterExpression record0_ =
-          Expressions.parameter(accPhysType.getJavaRowType(), "record0");
-      initBlock.add(Expressions.declare(0, record0_, null));
-      initBlock.add(
-          Expressions.statement(
-              Expressions.assign(record0_,
-                  Expressions.new_(accPhysType.getJavaRowType()))));
-      List<Types.RecordField> fieldList = synType.getRecordFields();
-      for (int i = 0; i < initExpressions.size(); i++) {
-        Expression right = initExpressions.get(i);
-        initBlock.add(
-            Expressions.statement(
-                Expressions.assign(
-                    Expressions.field(record0_, fieldList.get(i)),
-                    right)));
-      }
-      initBlock.add(record0_);
-    } else {
-      initBlock.add(accPhysType.record(initExpressions));
-    }
+    declareParentAccumulator(initExpressions, initBlock, accPhysType);
 
     final Expression accumulatorInitializer =
-        builder.append(
-            "accumulatorInitializer",
+        builder.append("accumulatorInitializer",
             Expressions.lambda(
                 Function0.class,
                 initBlock.toBlock()));
@@ -274,12 +249,12 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
     //             return acc;
     //         }
     //     };
-    final BlockBuilder builder2 = new BlockBuilder();
     final ParameterExpression inParameter =
         Expressions.parameter(inputPhysType.getJavaRowType(), "in");
     final ParameterExpression acc_ =
         Expressions.parameter(accPhysType.getJavaRowType(), "acc");
     for (int i = 0, stateOffset = 0; i < aggs.size(); i++) {
+      final BlockBuilder builder2 = new BlockBuilder();
       final AggImpState agg = aggs.get(i);
 
       final int stateSize = agg.state.size();
@@ -315,24 +290,25 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
                   currentBlock(),
                   new RexToLixTranslator.InputGetterImpl(
                       Collections.singletonList(
-                          Pair.of((Expression) inParameter, inputPhysType))),
+                          Pair.of(inParameter, inputPhysType))),
                   implementor.getConformance())
                   .setNullable(currentNullables());
             }
           };
 
       agg.implementor.implementAdd(agg.context, addContext);
+      builder2.add(acc_);
+      agg.accumulatorAdder = builder.append("accumulatorAdder",
+          Expressions.lambda(Function2.class, builder2.toBlock(), acc_,
+              inParameter));
     }
-    builder2.add(acc_);
-    final Expression accumulatorAdder =
-        builder.append(
-            "accumulatorAdder",
-            Expressions.lambda(
-                Function2.class,
-                builder2.toBlock(),
-                acc_,
-                inParameter));
 
+    final ParameterExpression lambdaFactory =
+        Expressions.parameter(AggregateLambdaFactory.class,
+            builder.newName("lambdaFactory"));
+
+    implementLambdaFactory(builder, inputPhysType, aggs, accumulatorInitializer,
+        hasOrderedCall(aggs), lambdaFactory);
     // Function2<Integer, Object[], Object[]> resultSelector =
     //     new Function2<Integer, Object[], Object[]>() {
     //         public Object[] apply(Integer key, Object[] acc) {
@@ -390,9 +366,13 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
                   BuiltInMethod.GROUP_BY_MULTIPLE.method,
                   Expressions.list(childExp,
                       keySelectors_,
-                      accumulatorInitializer,
-                      accumulatorAdder,
-                      resultSelector)
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_INITIALIZER.method),
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_ADDER.method),
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_RESULT_SELECTOR.method,
+                          resultSelector))
                       .appendIfNotNull(keyPhysType.comparer()))));
     } else if (groupCount == 0) {
       final Expression resultSelector =
@@ -410,9 +390,15 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
                   Expressions.call(
                       childExp,
                       BuiltInMethod.AGGREGATE.method,
-                      Expressions.call(accumulatorInitializer, "apply"),
-                      accumulatorAdder,
-                      resultSelector))));
+                      Expressions.call(
+                          Expressions.call(lambdaFactory,
+                              BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_INITIALIZER.method),
+                          BuiltInMethod.FUNCTION0_APPLY.method),
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_ADDER.method),
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_SINGLE_GROUP_RESULT_SELECTOR.method,
+                          resultSelector)))));
     } else if (aggCalls.isEmpty()
         && groupSet.equals(
             ImmutableBitSet.range(child.getRowType().getFieldCount()))) {
@@ -441,14 +427,125 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
               Expressions.call(childExp,
                   BuiltInMethod.GROUP_BY2.method,
                   Expressions.list(keySelector_,
-                      accumulatorInitializer,
-                      accumulatorAdder,
-                      resultSelector_)
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_INITIALIZER.method),
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_ADDER.method),
+                      Expressions.call(lambdaFactory,
+                          BuiltInMethod.AGG_LAMBDA_FACTORY_ACC_RESULT_SELECTOR.method,
+                          resultSelector_))
                       .appendIfNotNull(keyPhysType.comparer()))));
     }
     return implementor.result(physType, builder.toBlock());
   }
 
+  private static boolean hasOrderedCall(List<AggImpState> aggs) {
+    for (AggImpState agg : aggs) {
+      if (!agg.call.collation.equals(RelCollations.EMPTY)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void declareParentAccumulator(List<Expression> initExpressions,
+      BlockBuilder initBlock, PhysType accPhysType) {
+    if (accPhysType.getJavaRowType()
+        instanceof JavaTypeFactoryImpl.SyntheticRecordType) {
+      // We have to initialize the SyntheticRecordType instance this way, to
+      // avoid using a class constructor with too many parameters.
+      final JavaTypeFactoryImpl.SyntheticRecordType synType =
+          (JavaTypeFactoryImpl.SyntheticRecordType)
+          accPhysType.getJavaRowType();
+      final ParameterExpression record0_ =
+          Expressions.parameter(accPhysType.getJavaRowType(), "record0");
+      initBlock.add(Expressions.declare(0, record0_, null));
+      initBlock.add(
+          Expressions.statement(
+              Expressions.assign(record0_,
+                  Expressions.new_(accPhysType.getJavaRowType()))));
+      List<Types.RecordField> fieldList = synType.getRecordFields();
+      for (int i = 0; i < initExpressions.size(); i++) {
+        Expression right = initExpressions.get(i);
+        initBlock.add(
+            Expressions.statement(
+                Expressions.assign(
+                    Expressions.field(record0_, fieldList.get(i)), right)));
+      }
+      initBlock.add(record0_);
+    } else {
+      initBlock.add(accPhysType.record(initExpressions));
+    }
+  }
+
+  /**
+   * Implements the {@link AggregateLambdaFactory}.
+   *
+   * <p>Behavior depends upon ordering:
+   * <ul>
+   *
+   * <li>{@code hasOrderedCall == true} means there is at least one aggregate
+   * call including sort spec. We use {@link OrderedAggregateLambdaFactory}
+   * implementation to implement sorted aggregates for that.
+   *
+   * <li>{@code hasOrderedCall == false} indicates to use
+   * {@link SequencedAdderAggregateLambdaFactory} to implement a non-sort
+   * aggregate.
+   *
+   * </ul>
+   */
+  private void implementLambdaFactory(BlockBuilder builder,
+      PhysType inputPhysType,
+      List<AggImpState> aggs,
+      Expression accumulatorInitializer,
+      boolean hasOrderedCall,
+      ParameterExpression lambdaFactory) {
+    if (hasOrderedCall) {
+      ParameterExpression pe = Expressions.parameter(List.class,
+          builder.newName("sourceSorters"));
+      builder.add(
+          Expressions.declare(0, pe, Expressions.new_(LinkedList.class)));
+
+      for (AggImpState agg : aggs) {
+        if (agg.call.collation.equals(RelCollations.EMPTY)) {
+          continue;
+        }
+        final Pair<Expression, Expression> pair =
+            inputPhysType.generateCollationKey(
+                agg.call.collation.getFieldCollations());
+        builder.add(
+            Expressions.statement(
+                Expressions.call(pe,
+                    BuiltInMethod.COLLECTION_ADD.method,
+                    Expressions.new_(BuiltInMethod.SOURCE_SORTER.constructor,
+                        agg.accumulatorAdder, pair.left, pair.right))));
+      }
+      builder.add(
+          Expressions.declare(0, lambdaFactory,
+              Expressions.new_(
+                  BuiltInMethod.ORDERED_AGGREGATE_LAMBDA_FACTORY.constructor,
+                  accumulatorInitializer, pe)));
+    } else {
+      // when hasOrderedCall == false
+      ParameterExpression pe = Expressions.parameter(List.class,
+          builder.newName("accumulatorAdders"));
+      builder.add(
+          Expressions.declare(0, pe, Expressions.new_(LinkedList.class)));
+
+      for (AggImpState agg : aggs) {
+        builder.add(
+            Expressions.statement(
+                Expressions.call(pe, BuiltInMethod.COLLECTION_ADD.method,
+                    agg.accumulatorAdder)));
+      }
+      builder.add(
+          Expressions.declare(0, lambdaFactory,
+              Expressions.new_(
+                  BuiltInMethod.SEQUENCED_ADDER_AGGREGATE_LAMBDA_FACTORY.constructor,
+                  accumulatorInitializer, pe)));
+    }
+  }
+
   /** An implementation of {@link AggContext}. */
   private class AggContextImpl implements AggContext {
     private final AggImpState agg;

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java
new file mode 100644
index 0000000..3c7c72a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.function.Function0;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.function.Function2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Generate aggregate lambdas that sorts the input source before calling each
+ * aggregate adder.
+ *
+ * @param <TSource> Type of the enumerable input source
+ * @param <TKey> Type of the group-by key
+ * @param <TSortKey> Type of the sort key
+ * @param <TOrigAccumulate> Type of the original accumulator
+ * @param <TResult> Type of the enumerable output result
+ */
+public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
+    TOrigAccumulate, TResult>
+    implements AggregateLambdaFactory<TSource, TOrigAccumulate,
+    OrderedAggregateLambdaFactory.LazySource<TSource>, TResult, TKey> {
+
+  private final Function0<TOrigAccumulate> accumulatorInitializer;
+  private final List<SourceSorter<TOrigAccumulate, TSource, TSortKey>> sourceSorters;
+
+  public OrderedAggregateLambdaFactory(
+      Function0<TOrigAccumulate> accumulatorInitializer,
+      List<SourceSorter<TOrigAccumulate, TSource, TSortKey>> sourceSorters) {
+    this.accumulatorInitializer = accumulatorInitializer;
+    this.sourceSorters = sourceSorters;
+  }
+
+  public Function0<LazySource<TSource>> accumulatorInitializer() {
+    return LazySource::new;
+  }
+
+  public Function2<LazySource<TSource>,
+      TSource, LazySource<TSource>> accumulatorAdder() {
+    return (lazySource, source) -> {
+      lazySource.add(source);
+      return lazySource;
+    };
+  }
+
+  public Function1<LazySource<TSource>, TResult> singleGroupResultSelector(
+      Function1<TOrigAccumulate, TResult> resultSelector) {
+    return lazySource -> {
+      final TOrigAccumulate accumulator = accumulatorInitializer.apply();
+      for (SourceSorter<TOrigAccumulate, TSource, TSortKey> acc : sourceSorters) {
+        acc.sortAndAccumulate(lazySource, accumulator);
+      }
+      return resultSelector.apply(accumulator);
+    };
+  }
+
+  public Function2<TKey, LazySource<TSource>, TResult> resultSelector(
+      Function2<TKey, TOrigAccumulate, TResult> resultSelector) {
+    return (groupByKey, lazySource) -> {
+      final TOrigAccumulate accumulator = accumulatorInitializer.apply();
+      for (SourceSorter<TOrigAccumulate, TSource, TSortKey> acc : sourceSorters) {
+        acc.sortAndAccumulate(lazySource, accumulator);
+      }
+      return resultSelector.apply(groupByKey, accumulator);
+    };
+  }
+
+  /**
+   * Cache the input sources. (Will be sorted, aggregated in result selector.)
+   *
+   * @param <TSource> Type of the enumerable input source.
+   */
+  public static class LazySource<TSource> implements Iterable<TSource> {
+    private final List<TSource> list = new ArrayList<>();
+
+    private void add(TSource source) {
+      list.add(source);
+    }
+
+    @Override public Iterator<TSource> iterator() {
+      return list.iterator();
+    }
+  }
+
+}
+
+// End OrderedAggregateLambdaFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java
new file mode 100644
index 0000000..4b4577c
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.function.Function0;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.function.Function2;
+
+import java.util.List;
+
+/**
+ * Implementation of {@link AggregateLambdaFactory} that applies a sequence of
+ * accumulator adders to input source.
+ *
+ * @param <TSource> Type of the enumerable input source
+ * @param <TAccumulate> Type of the accumulator
+ * @param <TResult> Type of the enumerable output result
+ * @param <TKey> Type of the group-by key
+ */
+public class SequencedAdderAggregateLambdaFactory<TSource, TAccumulate, TResult, TKey>
+    implements AggregateLambdaFactory<TSource, TAccumulate, TAccumulate, TResult, TKey> {
+
+  private final Function0<TAccumulate> accumulatorInitializer;
+  private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdderDecorator;
+
+  public SequencedAdderAggregateLambdaFactory(
+      Function0<TAccumulate> accumulatorInitializer,
+      List<Function2<TAccumulate, TSource, TAccumulate>> accumulatorAdders) {
+    this.accumulatorInitializer = accumulatorInitializer;
+    this.accumulatorAdderDecorator = new AccumulatorAdderSeq(accumulatorAdders);
+  }
+
+  @Override public Function0<TAccumulate> accumulatorInitializer() {
+    return accumulatorInitializer;
+  }
+
+  @Override public Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder() {
+    return accumulatorAdderDecorator;
+  }
+
+  @Override public Function1<TAccumulate, TResult> singleGroupResultSelector(
+      Function1<TAccumulate, TResult> resultSelector) {
+    return resultSelector;
+  }
+
+  @Override public Function2<TKey, TAccumulate, TResult> resultSelector(
+      Function2<TKey, TAccumulate, TResult> resultSelector) {
+    return resultSelector;
+  }
+
+  /**
+   * Decorator class of a sequence of accumulator adder functions.
+   */
+  private class AccumulatorAdderSeq
+      implements Function2<TAccumulate, TSource, TAccumulate> {
+    private final List<Function2<TAccumulate, TSource, TAccumulate>> accumulatorAdders;
+
+    AccumulatorAdderSeq(
+        List<Function2<TAccumulate, TSource, TAccumulate>> accumulatorAdders) {
+      this.accumulatorAdders = accumulatorAdders;
+    }
+
+    @Override public TAccumulate apply(TAccumulate accumulator, TSource source) {
+      TAccumulate result = accumulator;
+      for (Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder
+          : accumulatorAdders) {
+        result = accumulatorAdder.apply(accumulator, source);
+      }
+      return result;
+    }
+  }
+}
+
+// End SequencedAdderAggregateLambdaFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
new file mode 100644
index 0000000..9345da4
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.function.Function2;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Helper that combines the sorting process and accumulating process against the
+ * aggregate execution, used with {@link OrderedAggregateLambdaFactory}.
+ *
+ * @param <TAccumulate> Type of the accumulator
+ * @param <TSource> Type of the enumerable input source
+ * @param <TSortKey> Type of the sort key
+ */
+public class SourceSorter<TAccumulate, TSource, TSortKey> {
+  private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+  private final Function1<TSource, TSortKey> keySelector;
+  private final Comparator<TSortKey> comparator;
+
+  public SourceSorter(
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      Function1<TSource, TSortKey> keySelector,
+      Comparator<TSortKey> comparator) {
+    this.accumulatorAdder = accumulatorAdder;
+    this.keySelector = keySelector;
+    this.comparator = comparator;
+  }
+
+  public void sortAndAccumulate(Iterable<TSource> sourceIterable,
+      TAccumulate accumulator) {
+    List<TSource> sorted = Linq4j.asEnumerable(sourceIterable)
+        .orderBy(keySelector, comparator)
+        .toList();
+    TAccumulate accumulator1 = accumulator;
+    for (TSource source : sorted) {
+      accumulator1 = accumulatorAdder.apply(accumulator1, source);
+    }
+  }
+}
+
+// End SourceSorter.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
index cda1e66..e15e491 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
@@ -18,6 +18,7 @@ package org.apache.calcite.plan;
 
 import org.apache.calcite.avatica.AvaticaConnection;
 import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelHomogeneousShuttle;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttle;
@@ -475,6 +476,7 @@ public abstract class RelOptUtil {
               false,
               ImmutableList.of(0),
               -1,
+              RelCollations.EMPTY,
               0,
               ret,
               null,
@@ -567,6 +569,7 @@ public abstract class RelOptUtil {
             false,
             ImmutableList.of(projectedKeyCount),
             -1,
+            RelCollations.EMPTY,
             projectedKeyCount,
             ret,
             null,
@@ -765,15 +768,14 @@ public abstract class RelOptUtil {
   public static RelNode createSingleValueAggRel(
       RelOptCluster cluster,
       RelNode rel) {
-    // assert (rel.getRowType().getFieldCount() == 1);
     final int aggCallCnt = rel.getRowType().getFieldCount();
     final List<AggregateCall> aggCalls = new ArrayList<>();
 
     for (int i = 0; i < aggCallCnt; i++) {
       aggCalls.add(
-          AggregateCall.create(
-              SqlStdOperatorTable.SINGLE_VALUE, false, false,
-              ImmutableList.of(i), -1, 0, rel, null, null));
+          AggregateCall.create(SqlStdOperatorTable.SINGLE_VALUE, false, false,
+              ImmutableList.of(i), -1, RelCollations.EMPTY, 0, rel, null,
+              null));
     }
 
     return LogicalAggregate.create(rel, ImmutableBitSet.of(), null, aggCalls);

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java b/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
index f37d3cd..4212fae 100644
--- a/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
+++ b/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
@@ -61,7 +61,6 @@ import org.apache.calcite.util.trace.CalciteTrace;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
@@ -1247,17 +1246,10 @@ public class SubstitutionVisitor {
     ImmutableList<ImmutableBitSet> groupSets =
         Mappings.apply2(mapping, aggregate.groupSets);
     List<AggregateCall> aggregateCalls =
-        apply(mapping, aggregate.aggCalls);
+        Util.transform(aggregate.aggCalls, call -> call.transform(mapping));
     return MutableAggregate.of(input, groupSet, groupSets, aggregateCalls);
   }
 
-  private static List<AggregateCall> apply(final Mapping mapping,
-      List<AggregateCall> aggCallList) {
-    return Lists.transform(aggCallList,
-        call -> call.copy(Mappings.apply2(mapping, call.getArgList()),
-            Mappings.apply(mapping, call.filterArg)));
-  }
-
   public static MutableRel unifyAggregates(MutableAggregate query,
       MutableAggregate target) {
     if (query.getGroupType() != Aggregate.Group.SIMPLE
@@ -1304,7 +1296,8 @@ public class SubstitutionVisitor {
             AggregateCall.create(getRollup(aggregateCall.getAggregation()),
                 aggregateCall.isDistinct(), aggregateCall.isApproximate(),
                 ImmutableList.of(target.groupSet.cardinality() + i), -1,
-                aggregateCall.type, aggregateCall.name));
+                aggregateCall.collation, aggregateCall.type,
+                aggregateCall.name));
       }
       result = MutableAggregate.of(target, groupSet.build(), null,
           aggregateCalls);

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/prepare/CalciteCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/CalciteCatalogReader.java b/core/src/main/java/org/apache/calcite/prepare/CalciteCatalogReader.java
index cbfd145..9685239 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalciteCatalogReader.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalciteCatalogReader.java
@@ -60,6 +60,7 @@ import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
 import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Optionality;
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
@@ -324,7 +325,8 @@ public class CalciteCatalogReader implements Prepare.CatalogReader {
     } else if (function instanceof AggregateFunction) {
       return new SqlUserDefinedAggFunction(name,
           infer((AggregateFunction) function), InferTypes.explicit(argTypes),
-          typeChecker, (AggregateFunction) function, false, false, typeFactory);
+          typeChecker, (AggregateFunction) function, false, false,
+          Optionality.FORBIDDEN, typeFactory);
     } else if (function instanceof TableMacro) {
       return new SqlUserDefinedTableMacro(name, ReturnTypes.CURSOR,
           InferTypes.explicit(argTypes), typeChecker, paramTypes,

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/RelCollations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/RelCollations.java b/core/src/main/java/org/apache/calcite/rel/RelCollations.java
index c3b16c6..5d7c7bb 100644
--- a/core/src/main/java/org/apache/calcite/rel/RelCollations.java
+++ b/core/src/main/java/org/apache/calcite/rel/RelCollations.java
@@ -19,6 +19,7 @@ package org.apache.calcite.rel;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mappings;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -26,6 +27,7 @@ import com.google.common.collect.Lists;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -186,6 +188,24 @@ public class RelCollations {
     }
     return new RelCollationImpl(fieldCollations.build());
   }
+
+  /** Creates a copy of this collation that changes the ordinals of input
+   * fields. */
+  public static RelCollation permute(RelCollation collation,
+      Map<Integer, Integer> mapping) {
+    return of(
+        Util.transform(collation.getFieldCollations(),
+            fc -> fc.copy(mapping.get(fc.getFieldIndex()))));
+  }
+
+  /** Creates a copy of this collation that changes the ordinals of input
+   * fields. */
+  public static RelCollation permute(RelCollation collation,
+      Mappings.TargetMapping mapping) {
+    return of(
+        Util.transform(collation.getFieldCollations(),
+            fc -> fc.copy(mapping.getTarget(fc.getFieldIndex()))));
+  }
 }
 
 // End RelCollations.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java b/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
index 5df2a51..fb32cbc 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.rel.core;
 
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -30,8 +32,8 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * Call to an aggFunction function within an
- * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ * Call to an aggregate function within an
+ * {@link org.apache.calcite.rel.core.Aggregate}.
  */
 public class AggregateCall {
   //~ Instance fields --------------------------------------------------------
@@ -47,6 +49,7 @@ public class AggregateCall {
   // since all values are small, ImmutableList uses cached Integer values.
   private final ImmutableList<Integer> argList;
   public final int filterArg;
+  public final RelCollation collation;
 
   //~ Constructors -----------------------------------------------------------
 
@@ -66,7 +69,8 @@ public class AggregateCall {
       List<Integer> argList,
       RelDataType type,
       String name) {
-    this(aggFunction, distinct, false, argList, -1, type, name);
+    this(aggFunction, distinct, false,
+        argList, -1, RelCollations.EMPTY, type, name);
   }
 
   /**
@@ -76,23 +80,22 @@ public class AggregateCall {
    * @param distinct    Whether distinct
    * @param approximate Whether approximate
    * @param argList     List of ordinals of arguments
-   * @param filterArg   Ordinal of filter argument, or -1
+   * @param filterArg   Ordinal of filter argument (the
+   *                    {@code FILTER (WHERE ...)} clause in SQL), or -1
+   * @param collation   How to sort values before aggregation (the
+   *                    {@code WITHIN GROUP} clause in SQL)
    * @param type        Result type
    * @param name        Name (may be null)
    */
-  private AggregateCall(
-      SqlAggFunction aggFunction,
-      boolean distinct,
-      boolean approximate,
-      List<Integer> argList,
-      int filterArg,
-      RelDataType type,
-      String name) {
+  private AggregateCall(SqlAggFunction aggFunction, boolean distinct,
+      boolean approximate, List<Integer> argList, int filterArg,
+      RelCollation collation, RelDataType type, String name) {
     this.type = Objects.requireNonNull(type);
     this.name = name;
     this.aggFunction = Objects.requireNonNull(aggFunction);
     this.argList = ImmutableList.copyOf(argList);
     this.filterArg = filterArg;
+    this.collation = Objects.requireNonNull(collation);
     this.distinct = distinct;
     this.approximate = approximate;
   }
@@ -103,23 +106,32 @@ public class AggregateCall {
   public static AggregateCall create(SqlAggFunction aggFunction,
       boolean distinct, List<Integer> argList, int groupCount, RelNode input,
       RelDataType type, String name) {
-    return create(aggFunction, distinct, false, argList, -1, groupCount, input,
-        type, name);
+    return create(aggFunction, distinct, false, argList, -1,
+        RelCollations.EMPTY, groupCount, input, type, name);
   }
 
   @Deprecated // to be removed before 2.0
   public static AggregateCall create(SqlAggFunction aggFunction,
       boolean distinct, List<Integer> argList, int filterArg, int groupCount,
       RelNode input, RelDataType type, String name) {
-    return create(aggFunction, distinct, false, argList, -1, groupCount, input,
-        type, name);
+    return create(aggFunction, distinct, false, argList, filterArg,
+        RelCollations.EMPTY, groupCount, input, type, name);
   }
 
-  /** Creates an AggregateCall, inferring its type if {@code type} is null. */
+  @Deprecated // to be removed before 2.0
   public static AggregateCall create(SqlAggFunction aggFunction,
       boolean distinct, boolean approximate, List<Integer> argList,
       int filterArg, int groupCount,
       RelNode input, RelDataType type, String name) {
+    return create(aggFunction, distinct, approximate, argList,
+        filterArg, RelCollations.EMPTY, groupCount, input, type, name);
+  }
+
+  /** Creates an AggregateCall, inferring its type if {@code type} is null. */
+  public static AggregateCall create(SqlAggFunction aggFunction,
+      boolean distinct, boolean approximate, List<Integer> argList,
+      int filterArg, RelCollation collation, int groupCount,
+      RelNode input, RelDataType type, String name) {
     if (type == null) {
       final RelDataTypeFactory typeFactory =
           input.getCluster().getTypeFactory();
@@ -130,23 +142,32 @@ public class AggregateCall {
               groupCount, filterArg >= 0);
       type = aggFunction.inferReturnType(callBinding);
     }
-    return create(aggFunction, distinct, approximate, argList, filterArg, type,
-        name);
+    return create(aggFunction, distinct, approximate, argList, filterArg,
+        collation, type, name);
   }
 
   @Deprecated // to be removed before 2.0
   public static AggregateCall create(SqlAggFunction aggFunction,
       boolean distinct, List<Integer> argList, int filterArg, RelDataType type,
       String name) {
-    return create(aggFunction, distinct, false, argList, filterArg, type, name);
+    return create(aggFunction, distinct, false, argList, filterArg,
+        RelCollations.EMPTY, type, name);
   }
 
-  /** Creates an AggregateCall. */
+  @Deprecated // to be removed before 2.0
   public static AggregateCall create(SqlAggFunction aggFunction,
       boolean distinct, boolean approximate, List<Integer> argList,
       int filterArg, RelDataType type, String name) {
+    return create(aggFunction, distinct, approximate, argList, filterArg,
+        RelCollations.EMPTY, type, name);
+  }
+
+  /** Creates an AggregateCall. */
+  public static AggregateCall create(SqlAggFunction aggFunction,
+      boolean distinct, boolean approximate, List<Integer> argList,
+      int filterArg, RelCollation collation, RelDataType type, String name) {
     return new AggregateCall(aggFunction, distinct, approximate, argList,
-        filterArg, type, name);
+        filterArg, collation, type, name);
   }
 
   /**
@@ -179,6 +200,16 @@ public class AggregateCall {
   }
 
   /**
+   * Returns the aggregate ordering definition (the {@code WITHIN GROUP} clause
+   * in SQL), or the empty list if not specified.
+   *
+   * @return ordering definition
+   */
+  public RelCollation getCollation() {
+    return collation;
+  }
+
+  /**
    * Returns the ordinals of the arguments to this call.
    *
    * <p>The list is immutable.
@@ -216,8 +247,10 @@ public class AggregateCall {
     if (Objects.equals(this.name, name)) {
       return this;
     }
-    return new AggregateCall(aggFunction, distinct, approximate, argList,
-        filterArg, type, name);
+    return new AggregateCall(aggFunction, distinct, approximate,
+        argList,
+        filterArg, RelCollations.EMPTY, type,
+        name);
   }
 
   public String toString() {
@@ -235,6 +268,11 @@ public class AggregateCall {
       buf.append(arg);
     }
     buf.append(")");
+    if (!collation.equals(RelCollations.EMPTY)) {
+      buf.append(" WITHIN GROUP (");
+      buf.append(collation);
+      buf.append(")");
+    }
     if (hasFilter()) {
       buf.append(" FILTER $");
       buf.append(filterArg);
@@ -257,11 +295,12 @@ public class AggregateCall {
     return aggFunction.equals(other.aggFunction)
         && (distinct == other.distinct)
         && argList.equals(other.argList)
-        && filterArg == other.filterArg;
+        && filterArg == other.filterArg
+        && Objects.equals(collation, other.collation);
   }
 
   @Override public int hashCode() {
-    return Objects.hash(aggFunction, distinct, argList, filterArg);
+    return Objects.hash(aggFunction, distinct, argList, filterArg, collation);
   }
 
   /**
@@ -282,17 +321,27 @@ public class AggregateCall {
   /**
    * Creates an equivalent AggregateCall with new argument ordinals.
    *
+   * @see #transform(Mappings.TargetMapping)
+   *
    * @param args Arguments
    * @return AggregateCall that suits new inputs and GROUP BY columns
    */
-  public AggregateCall copy(List<Integer> args, int filterArg) {
+  public AggregateCall copy(List<Integer> args, int filterArg,
+      RelCollation collation) {
     return new AggregateCall(aggFunction, distinct, approximate, args,
-        filterArg, type, name);
+        filterArg, collation, type, name);
+  }
+
+  @Deprecated // to be removed before 2.0
+  public AggregateCall copy(List<Integer> args, int filterArg) {
+    // ignoring collation is error-prone
+    return copy(args, filterArg, collation);
   }
 
   @Deprecated // to be removed before 2.0
   public AggregateCall copy(List<Integer> args) {
-    return copy(args, filterArg);
+    // ignoring filterArg and collation is error-prone
+    return copy(args, filterArg, collation);
   }
 
   /**
@@ -317,14 +366,15 @@ public class AggregateCall {
             ? type
             : null;
     return create(aggFunction, distinct, approximate, argList, filterArg,
-        newGroupKeyCount, input, newType, getName());
+        collation, newGroupKeyCount, input, newType, getName());
   }
 
   /** Creates a copy of this aggregate call, applying a mapping to its
    * arguments. */
   public AggregateCall transform(Mappings.TargetMapping mapping) {
     return copy(Mappings.apply2((Mapping) mapping, argList),
-        hasFilter() ? Mappings.apply(mapping, filterArg) : -1);
+        hasFilter() ? Mappings.apply(mapping, filterArg) : -1,
+        RelCollations.permute(collation, mapping));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/core/Window.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Window.java b/core/src/main/java/org/apache/calcite/rel/core/Window.java
index 1e00bba..186f7d3 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Window.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Window.java
@@ -316,6 +316,7 @@ public abstract class Window extends SingleRel {
           final SqlAggFunction op = (SqlAggFunction) aggCall.getOperator();
           return AggregateCall.create(op, aggCall.distinct,
               false, getProjectOrdinals(aggCall.getOperands()), -1,
+              RelCollations.EMPTY,
               aggCall.getType(), fieldNames.get(aggCall.ordinal));
         }
       };

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/externalize/RelJsonReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/externalize/RelJsonReader.java b/core/src/main/java/org/apache/calcite/rel/externalize/RelJsonReader.java
index 695ab0b..29c3f44 100644
--- a/core/src/main/java/org/apache/calcite/rel/externalize/RelJsonReader.java
+++ b/core/src/main/java/org/apache/calcite/rel/externalize/RelJsonReader.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
@@ -280,7 +281,9 @@ public class RelJsonReader {
     final RelDataType type =
         relJson.toType(cluster.getTypeFactory(), jsonAggCall.get("type"));
     return AggregateCall.create(aggregation, distinct, false, operands,
-        filterOperand == null ? -1 : filterOperand, type, null);
+        filterOperand == null ? -1 : filterOperand,
+        RelCollations.EMPTY,
+        type, null);
   }
 
   private RelNode lookupInput(String jsonInput) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java b/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java
index 0323a52..1de1255 100644
--- a/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java
+++ b/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java
@@ -768,6 +768,23 @@ public abstract class SqlImplementor {
       };
     }
 
+    void addOrderItem(List<SqlNode> orderByList, RelFieldCollation field) {
+      if (field.nullDirection != RelFieldCollation.NullDirection.UNSPECIFIED) {
+        final boolean first =
+            field.nullDirection == RelFieldCollation.NullDirection.FIRST;
+        SqlNode nullDirectionNode =
+            dialect.emulateNullDirection(field(field.getFieldIndex()),
+                first, field.direction.isDescending());
+        if (nullDirectionNode != null) {
+          orderByList.add(nullDirectionNode);
+          field = new RelFieldCollation(field.getFieldIndex(),
+              field.getDirection(),
+              RelFieldCollation.NullDirection.UNSPECIFIED);
+        }
+      }
+      orderByList.add(toSql(field));
+    }
+
     /** Converts a call to an aggregate function to an expression. */
     public SqlNode toSql(AggregateCall aggCall) {
       final SqlOperator op = aggCall.getAggregation();
@@ -778,16 +795,32 @@ public abstract class SqlImplementor {
       final SqlLiteral qualifier =
           aggCall.isDistinct() ? SqlSelectKeyword.DISTINCT.symbol(POS) : null;
       final SqlNode[] operands = operandList.toArray(new SqlNode[0]);
+      List<SqlNode> orderByList = Expressions.list();
+      for (RelFieldCollation field : aggCall.collation.getFieldCollations()) {
+        addOrderItem(orderByList, field);
+      }
+      SqlNodeList orderList = new SqlNodeList(orderByList, POS);
       if (op instanceof SqlSumEmptyIsZeroAggFunction) {
         final SqlNode node =
-            SqlStdOperatorTable.SUM.createCall(qualifier, POS, operands);
+            withOrder(
+                SqlStdOperatorTable.SUM.createCall(qualifier, POS, operands),
+                orderList);
         return SqlStdOperatorTable.COALESCE.createCall(POS, node,
             SqlLiteral.createExactNumeric("0", POS));
       } else {
-        return op.createCall(qualifier, POS, operands);
+        return withOrder(op.createCall(qualifier, POS, operands), orderList);
       }
     }
 
+    /** Wraps a call in a {@link SqlKind#WITHIN_GROUP} call, if
+     * {@code orderList} is non-empty. */
+    private SqlNode withOrder(SqlCall call, SqlNodeList orderList) {
+      if (orderList == null || orderList.size() == 0) {
+        return call;
+      }
+      return SqlStdOperatorTable.WITHIN_GROUP.createCall(POS, call, orderList);
+    }
+
     /** Converts a collation to an ORDER BY item. */
     public SqlNode toSql(RelFieldCollation collation) {
       SqlNode node = field(collation.getFieldIndex());
@@ -1217,19 +1250,7 @@ public abstract class SqlImplementor {
 
     public void addOrderItem(List<SqlNode> orderByList,
         RelFieldCollation field) {
-      if (field.nullDirection != RelFieldCollation.NullDirection.UNSPECIFIED) {
-        boolean first = field.nullDirection == RelFieldCollation.NullDirection.FIRST;
-        SqlNode nullDirectionNode =
-            dialect.emulateNullDirection(context.field(field.getFieldIndex()),
-                first, field.direction.isDescending());
-        if (nullDirectionNode != null) {
-          orderByList.add(nullDirectionNode);
-          field = new RelFieldCollation(field.getFieldIndex(),
-              field.getDirection(),
-              RelFieldCollation.NullDirection.UNSPECIFIED);
-        }
-      }
-      orderByList.add(context.toSql(field));
+      context.addOrderItem(orderByList, field);
     }
 
     public Result result() {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
index 68a9382..b164fe0 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AbstractMaterializedViewRule.java
@@ -1198,6 +1198,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
             rexBuilder.makeInputRef(relBuilder.peek(),
                 aggregate.getGroupCount() + i);
         aggregateCalls.add(
+            // TODO: handle aggregate ordering
             relBuilder.aggregateCall(rollupAgg, operand)
                 .distinct(aggCall.isDistinct())
                 .approximate(aggCall.isApproximate())
@@ -1469,6 +1470,7 @@ public abstract class AbstractMaterializedViewRule extends RelOptRule {
               }
               final RexInputRef operand = rexBuilder.makeInputRef(input, k);
               aggregateCalls.add(
+                  // TODO: handle aggregate ordering
                   relBuilder.aggregateCall(rollupAgg, operand)
                       .approximate(queryAggCall.isApproximate())
                       .distinct(queryAggCall.isDistinct())

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java
index 6d974d8..7175a5f 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java
@@ -20,6 +20,7 @@ import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Aggregate.Group;
@@ -296,7 +297,7 @@ public final class AggregateExpandDistinctAggregatesRule extends RelOptRule {
         final AggregateCall newCall =
             AggregateCall.create(aggCall.getAggregation(), false,
                 aggCall.isApproximate(), aggCall.getArgList(), -1,
-                bottomGroupSet.cardinality(),
+                aggCall.collation, bottomGroupSet.cardinality(),
                 relBuilder.peek(), null, aggCall.name);
         bottomAggregateCalls.add(newCall);
       }
@@ -324,6 +325,7 @@ public final class AggregateExpandDistinctAggregatesRule extends RelOptRule {
                 aggCall.isApproximate(),
                 newArgList,
                 -1,
+                aggCall.collation,
                 originalGroupSet.cardinality(),
                 relBuilder.peek(),
                 aggCall.getType(),
@@ -331,19 +333,18 @@ public final class AggregateExpandDistinctAggregatesRule extends RelOptRule {
       } else {
         // If aggregate B had a COUNT aggregate call the corresponding aggregate at
         // aggregate A must be SUM. For other aggregates, it remains the same.
-        final List<Integer> newArgs =
-            Lists.newArrayList(bottomGroups.size()
-                + nonDistinctAggCallProcessedSoFar);
+        final int arg = bottomGroups.size() + nonDistinctAggCallProcessedSoFar;
+        final List<Integer> newArgs = ImmutableList.of(arg);
         if (aggCall.getAggregation().getKind() == SqlKind.COUNT) {
           newCall =
               AggregateCall.create(new SqlSumEmptyIsZeroAggFunction(), false,
-                  aggCall.isApproximate(), newArgs, -1,
+                  aggCall.isApproximate(), newArgs, -1, aggCall.collation,
                   originalGroupSet.cardinality(), relBuilder.peek(),
                   aggCall.getType(), aggCall.getName());
         } else {
           newCall =
               AggregateCall.create(aggCall.getAggregation(), false,
-                  aggCall.isApproximate(), newArgs, -1,
+                  aggCall.isApproximate(), newArgs, -1, aggCall.collation,
                   originalGroupSet.cardinality(),
                   relBuilder.peek(), aggCall.getType(), aggCall.name);
         }
@@ -409,8 +410,8 @@ public final class AggregateExpandDistinctAggregatesRule extends RelOptRule {
     final int z = groupCount + distinctAggCalls.size();
     distinctAggCalls.add(
         AggregateCall.create(SqlStdOperatorTable.GROUPING, false, false,
-            ImmutableIntList.copyOf(fullGroupSet), -1, groupSets.size(),
-            relBuilder.peek(), null, "$g"));
+            ImmutableIntList.copyOf(fullGroupSet), -1, RelCollations.EMPTY,
+            groupSets.size(), relBuilder.peek(), null, "$g"));
     for (Ord<ImmutableBitSet> groupSet : Ord.zip(groupSets)) {
       filters.put(groupSet.e, z + groupSet.i);
     }
@@ -455,8 +456,8 @@ public final class AggregateExpandDistinctAggregatesRule extends RelOptRule {
       }
       final AggregateCall newCall =
           AggregateCall.create(aggregation, false, aggCall.isApproximate(),
-              newArgList, newFilterArg, aggregate.getGroupCount(), distinct,
-              null, aggCall.name);
+              newArgList, newFilterArg, aggCall.collation,
+              aggregate.getGroupCount(), distinct, null, aggCall.name);
       newCalls.add(newCall);
     }
 
@@ -665,8 +666,8 @@ public final class AggregateExpandDistinctAggregatesRule extends RelOptRule {
           aggCall.filterArg >= 0 ? sourceOf.get(aggCall.filterArg) : -1;
       final AggregateCall newAggCall =
           AggregateCall.create(aggCall.getAggregation(), false,
-              aggCall.isApproximate(), newArgs,
-              newFilterArg, aggCall.getType(), aggCall.getName());
+              aggCall.isApproximate(), newArgs, newFilterArg, aggCall.collation,
+              aggCall.getType(), aggCall.getName());
       assert refs.get(i) == null;
       if (n == 0) {
         refs.set(i,
@@ -754,7 +755,7 @@ public final class AggregateExpandDistinctAggregatesRule extends RelOptRule {
       }
       final AggregateCall newAggCall =
           AggregateCall.create(aggCall.getAggregation(), false,
-              aggCall.isApproximate(), newArgs, -1,
+              aggCall.isApproximate(), newArgs, -1, aggCall.collation,
               aggCall.getType(), aggCall.getName());
       newAggCalls.set(i, newAggCall);
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AggregateExtractProjectRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateExtractProjectRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateExtractProjectRule.java
index 08b12dd..bd5a348 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateExtractProjectRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateExtractProjectRule.java
@@ -120,7 +120,7 @@ public class AggregateExtractProjectRule extends RelOptRule {
               .distinct(aggCall.isDistinct())
               .filter(filterArg)
               .approximate(aggCall.isApproximate())
-              .approximate(aggCall.isApproximate())
+              .sort(relBuilder.fields(aggCall.collation))
               .as(aggCall.name));
     }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AggregateFilterTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateFilterTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateFilterTransposeRule.java
index 5c0e531..9b54f08 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateFilterTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateFilterTransposeRule.java
@@ -145,6 +145,7 @@ public class AggregateFilterTransposeRule extends RelOptRule {
         topAggCallList.add(
             AggregateCall.create(rollup, aggregateCall.isDistinct(),
                 aggregateCall.isApproximate(), ImmutableList.of(i++), -1,
+                aggregateCall.collation,
                 aggregateCall.type, aggregateCall.name));
       }
       final Aggregate topAggregate =

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectMergeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectMergeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectMergeRule.java
index c5ec515..925afbe 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectMergeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateProjectMergeRule.java
@@ -18,6 +18,7 @@ package org.apache.calcite.rel.rules;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Aggregate.Group;
@@ -29,13 +30,17 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mappings;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * Planner rule that recognizes a {@link org.apache.calcite.rel.core.Aggregate}
@@ -73,18 +78,26 @@ public class AggregateProjectMergeRule extends RelOptRule {
 
   public static RelNode apply(RelOptRuleCall call, Aggregate aggregate,
       Project project) {
-    final List<Integer> newKeys = new ArrayList<>();
+    // Find all fields which we need to be straightforward field projections.
+    final Set<Integer> interestingFields = new TreeSet<>();
+    interestingFields.addAll(aggregate.getGroupSet().asList());
+    for (AggregateCall aggregateCall : aggregate.getAggCallList()) {
+      interestingFields.addAll(aggregateCall.getArgList());
+      if (aggregateCall.filterArg >= 0) {
+        interestingFields.add(aggregateCall.filterArg);
+      }
+      interestingFields.addAll(RelCollations.ordinals(aggregateCall.collation));
+    }
+
+    // Build the map from old to new; abort if any entry is not a
+    // straightforward field projection.
     final Map<Integer, Integer> map = new HashMap<>();
-    for (int key : aggregate.getGroupSet()) {
-      final RexNode rex = project.getProjects().get(key);
-      if (rex instanceof RexInputRef) {
-        final int newKey = ((RexInputRef) rex).getIndex();
-        newKeys.add(newKey);
-        map.put(key, newKey);
-      } else {
-        // Cannot handle "GROUP BY expression"
+    for (int source : interestingFields) {
+      final RexNode rex = project.getProjects().get(source);
+      if (!(rex instanceof RexInputRef)) {
         return null;
       }
+      map.put(source, ((RexInputRef) rex).getIndex());
     }
 
     final ImmutableBitSet newGroupSet = aggregate.getGroupSet().permute(map);
@@ -97,28 +110,12 @@ public class AggregateProjectMergeRule extends RelOptRule {
 
     final ImmutableList.Builder<AggregateCall> aggCalls =
         ImmutableList.builder();
+    final int sourceCount = aggregate.getInput().getRowType().getFieldCount();
+    final int targetCount = project.getInput().getRowType().getFieldCount();
+    final Mappings.TargetMapping targetMapping =
+        Mappings.target(map, sourceCount, targetCount);
     for (AggregateCall aggregateCall : aggregate.getAggCallList()) {
-      final ImmutableList.Builder<Integer> newArgs = ImmutableList.builder();
-      for (int arg : aggregateCall.getArgList()) {
-        final RexNode rex = project.getProjects().get(arg);
-        if (rex instanceof RexInputRef) {
-          newArgs.add(((RexInputRef) rex).getIndex());
-        } else {
-          // Cannot handle "AGG(expression)"
-          return null;
-        }
-      }
-      final int newFilterArg;
-      if (aggregateCall.filterArg >= 0) {
-        final RexNode rex = project.getProjects().get(aggregateCall.filterArg);
-        if (!(rex instanceof RexInputRef)) {
-          return null;
-        }
-        newFilterArg = ((RexInputRef) rex).getIndex();
-      } else {
-        newFilterArg = -1;
-      }
-      aggCalls.add(aggregateCall.copy(newArgs.build(), newFilterArg));
+      aggCalls.add(aggregateCall.transform(targetMapping));
     }
 
     final Aggregate newAggregate =
@@ -130,6 +127,8 @@ public class AggregateProjectMergeRule extends RelOptRule {
     // contains duplicates.
     final RelBuilder relBuilder = call.builder();
     relBuilder.push(newAggregate);
+    final List<Integer> newKeys =
+        Lists.transform(aggregate.getGroupSet().asList(), map::get);
     if (!newKeys.equals(newGroupSet.asList())) {
       final List<Integer> posList = new ArrayList<>();
       for (int newKey : newKeys) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
index 68f6b16..2c9c4e5 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
@@ -316,6 +316,7 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
         oldCall.isApproximate(),
         ImmutableIntList.of(argOrdinal),
         filter,
+        oldCall.collation,
         aggFunction.inferReturnType(binding),
         null);
   }
@@ -339,6 +340,7 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
             oldCall.isApproximate(),
             oldCall.getArgList(),
             oldCall.filterArg,
+            oldCall.collation,
             oldAggRel.getGroupCount(),
             oldAggRel.getInput(),
             null,
@@ -349,6 +351,7 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
             oldCall.isApproximate(),
             oldCall.getArgList(),
             oldCall.filterArg,
+            oldCall.collation,
             oldAggRel.getGroupCount(),
             oldAggRel.getInput(),
             null,
@@ -396,14 +399,15 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
     final AggregateCall sumZeroCall =
         AggregateCall.create(SqlStdOperatorTable.SUM0, oldCall.isDistinct(),
             oldCall.isApproximate(), oldCall.getArgList(), oldCall.filterArg,
-            oldAggRel.getGroupCount(), oldAggRel.getInput(), null,
-            oldCall.name);
+            oldCall.collation, oldAggRel.getGroupCount(), oldAggRel.getInput(),
+            null, oldCall.name);
     final AggregateCall countCall =
         AggregateCall.create(SqlStdOperatorTable.COUNT,
             oldCall.isDistinct(),
             oldCall.isApproximate(),
             oldCall.getArgList(),
             oldCall.filterArg,
+            oldCall.collation,
             oldAggRel.getGroupCount(),
             oldAggRel,
             null,
@@ -494,6 +498,7 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
             oldCall.isApproximate(),
             ImmutableIntList.of(argOrdinal),
             oldCall.filterArg,
+            oldCall.collation,
             oldAggRel.getGroupCount(),
             oldAggRel.getInput(),
             null,
@@ -517,6 +522,7 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
             oldCall.isApproximate(),
             oldCall.getArgList(),
             oldCall.filterArg,
+            oldCall.collation,
             oldAggRel.getGroupCount(),
             oldAggRel,
             null,
@@ -589,6 +595,7 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
             oldCall.isApproximate(),
             ImmutableIntList.of(argOrdinal),
             filterArg,
+            oldCall.collation,
             oldAggRel.getGroupCount(),
             oldAggRel.getInput(),
             null,
@@ -634,6 +641,7 @@ public class AggregateReduceFunctionsRule extends RelOptRule {
             oldCall.isApproximate(),
             argOrdinals,
             filterArg,
+            oldCall.collation,
             oldAggRel.getGroupCount(),
             oldAggRel,
             null,

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
index e974763..317b0c5 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
@@ -236,6 +236,7 @@ public class AggregateStarTableRule extends RelOptRule {
       }
       return AggregateCall.create(roll, false,
           aggregateCall.isApproximate(), ImmutableList.of(offset + i), -1,
+          aggregateCall.collation,
           groupCount, relBuilder.peek(), null, aggregateCall.name);
     }
 
@@ -251,7 +252,7 @@ public class AggregateStarTableRule extends RelOptRule {
         newArgs.add(z);
       }
       return AggregateCall.create(aggregation, false,
-          aggregateCall.isApproximate(), newArgs, -1,
+          aggregateCall.isApproximate(), newArgs, -1, aggregateCall.collation,
           groupCount, relBuilder.peek(), null, aggregateCall.name);
     }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/AggregateUnionTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateUnionTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateUnionTransposeRule.java
index d6fd60e..0416338 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateUnionTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateUnionTransposeRule.java
@@ -173,8 +173,12 @@ public class AggregateUnionTransposeRule extends RelOptRule {
       AggregateCall newCall =
           AggregateCall.create(aggFun, origCall.isDistinct(),
               origCall.isApproximate(),
-              ImmutableList.of(groupCount + ord.i), -1, groupCount, input,
-              aggType, origCall.getName());
+              ImmutableList.of(groupCount + ord.i), -1,
+              origCall.collation,
+              groupCount,
+              input,
+              aggType,
+              origCall.getName());
       newCalls.add(newCall);
     }
     return newCalls;

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java b/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
index 39bccbf..3383f97 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
@@ -385,8 +385,7 @@ public abstract class SubQueryRemoveRule extends RelOptRule {
         // Builds the cross join
         builder.aggregate(builder.groupKey(),
             builder.count(false, "c"),
-            builder.aggregateCall(SqlStdOperatorTable.COUNT, builder.fields())
-                .as("ck"));
+            builder.count(builder.fields()).as("ck"));
         builder.as("ct");
         if (!variablesSet.isEmpty()) {
           builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet);

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexBuilder.java b/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
index 4a26cd8..5741a11 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
@@ -293,7 +293,8 @@ public class RexBuilder {
       final List<Integer> args = aggCall.getArgList();
       final List<Integer> nullableArgs = nullableArgs(args, aggArgTypes);
       if (!nullableArgs.equals(args)) {
-        aggCall = aggCall.copy(nullableArgs, aggCall.filterArg);
+        aggCall = aggCall.copy(nullableArgs, aggCall.filterArg,
+            aggCall.collation);
       }
     }
     RexNode rex = aggCallMapping.get(aggCall);

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 122b39e..54a8c2d 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -326,6 +326,15 @@ public interface CalciteResource {
   @BaseMessage("FILTER must not contain aggregate expression")
   ExInst<SqlValidatorException> aggregateInFilterIllegal();
 
+  @BaseMessage("WITHIN GROUP must not contain aggregate expression")
+  ExInst<SqlValidatorException> aggregateInWithinGroupIllegal();
+
+  @BaseMessage("Aggregate expression ''{0}'' must contain a within group clause")
+  ExInst<SqlValidatorException> aggregateMissingWithinGroupClause(String a0);
+
+  @BaseMessage("Aggregate expression ''{0}'' must not contain a within group clause")
+  ExInst<SqlValidatorException> withinGroupClauseIllegalInAggregate(String a0);
+
   @BaseMessage("Aggregate expression is illegal in ORDER BY clause of non-aggregating SELECT")
   ExInst<SqlValidatorException> aggregateIllegalInOrderBy();
 
@@ -436,6 +445,9 @@ public interface CalciteResource {
   @BaseMessage("DISTINCT/ALL not allowed with {0} function")
   ExInst<SqlValidatorException> functionQuantifierNotAllowed(String a0);
 
+  @BaseMessage("WITHIN GROUP not allowed with {0} function")
+  ExInst<SqlValidatorException> withinGroupNotAllowed(String a0);
+
   @BaseMessage("Some but not all arguments are named")
   ExInst<SqlValidatorException> someButNotAllArgumentsAreNamed();
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7bc9f140/core/src/main/java/org/apache/calcite/sql/SqlAggFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlAggFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlAggFunction.java
index 6c1f648..cca0ad0 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlAggFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlAggFunction.java
@@ -24,8 +24,11 @@ import org.apache.calcite.sql.type.SqlOperandTypeInference;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.util.Optionality;
 
 import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nonnull;
 
 /**
  * Abstract base class for the definition of an aggregate function: an operator
@@ -34,6 +37,7 @@ import java.util.List;
 public abstract class SqlAggFunction extends SqlFunction implements Context {
   private final boolean requiresOrder;
   private final boolean requiresOver;
+  private final Optionality requiresGroupOrder;
 
   //~ Constructors -----------------------------------------------------------
 
@@ -48,7 +52,8 @@ public abstract class SqlAggFunction extends SqlFunction implements Context {
       SqlFunctionCategory funcType) {
     // We leave sqlIdentifier as null to indicate that this is a builtin.
     this(name, null, kind, returnTypeInference, operandTypeInference,
-        operandTypeChecker, funcType, false, false);
+        operandTypeChecker, funcType, false, false,
+        Optionality.FORBIDDEN);
   }
 
   /** Creates a user-defined SqlAggFunction. */
@@ -62,7 +67,24 @@ public abstract class SqlAggFunction extends SqlFunction implements Context {
       SqlOperandTypeChecker operandTypeChecker,
       SqlFunctionCategory funcType) {
     this(name, sqlIdentifier, kind, returnTypeInference, operandTypeInference,
-        operandTypeChecker, funcType, false, false);
+        operandTypeChecker, funcType, false, false,
+        Optionality.FORBIDDEN);
+  }
+
+  @Deprecated // to be removed before 2.0
+  protected SqlAggFunction(
+      String name,
+      SqlIdentifier sqlIdentifier,
+      SqlKind kind,
+      SqlReturnTypeInference returnTypeInference,
+      SqlOperandTypeInference operandTypeInference,
+      SqlOperandTypeChecker operandTypeChecker,
+      SqlFunctionCategory funcType,
+      boolean requiresOrder,
+      boolean requiresOver) {
+    this(name, sqlIdentifier, kind, returnTypeInference, operandTypeInference,
+        operandTypeChecker, funcType, requiresOrder, requiresOver,
+        Optionality.FORBIDDEN);
   }
 
   /** Creates a built-in or user-defined SqlAggFunction or window function.
@@ -78,11 +100,13 @@ public abstract class SqlAggFunction extends SqlFunction implements Context {
       SqlOperandTypeChecker operandTypeChecker,
       SqlFunctionCategory funcType,
       boolean requiresOrder,
-      boolean requiresOver) {
+      boolean requiresOver,
+      Optionality requiresGroupOrder) {
     super(name, sqlIdentifier, kind, returnTypeInference, operandTypeInference,
         operandTypeChecker, null, funcType);
     this.requiresOrder = requiresOrder;
     this.requiresOver = requiresOver;
+    this.requiresGroupOrder = Objects.requireNonNull(requiresGroupOrder);
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -105,13 +129,40 @@ public abstract class SqlAggFunction extends SqlFunction implements Context {
       SqlValidatorScope scope,
       SqlValidatorScope operandScope) {
     super.validateCall(call, validator, scope, operandScope);
-    validator.validateAggregateParams(call, null, scope);
+    validator.validateAggregateParams(call, null, null, scope);
   }
 
   @Override public final boolean requiresOrder() {
     return requiresOrder;
   }
 
+  /** Returns whether this aggregate function must, may, or must not contain a
+   * {@code WITHIN GROUP (ORDER ...)} clause.
+   *
+   * <p>Cases:<ul>
+   *
+   * <li>If {@link Optionality#MANDATORY},
+   * then {@code AGG(x) WITHIN GROUP (ORDER BY 1)} is valid,
+   * and {@code AGG(x)} is invalid.
+   *
+   * <li>If {@link Optionality#OPTIONAL},
+   * then {@code AGG(x) WITHIN GROUP (ORDER BY 1)}
+   * and {@code AGG(x)} are both valid.
+   *
+   * <li>If {@link Optionality#IGNORED},
+   * then {@code AGG(x)} is valid,
+   * and {@code AGG(x) WITHIN GROUP (ORDER BY 1)} is valid but is
+   * treated the same as {@code AGG(x)}.
+   *
+   * <li>If {@link Optionality#FORBIDDEN},
+   * then {@code AGG(x) WITHIN GROUP (ORDER BY 1)} is invalid,
+   * and {@code AGG(x)} is valid.
+   * </ul>
+   */
+  public @Nonnull Optionality requiresGroupOrder() {
+    return requiresGroupOrder;
+  }
+
   @Override public final boolean requiresOver() {
     return requiresOver;
   }