You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/05/30 19:56:38 UTC

[incubator-druid] branch master updated: SQL: Allow select-sort-project query shapes. (#7769)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8649b8a  SQL: Allow select-sort-project query shapes. (#7769)
8649b8a is described below

commit 8649b8ab4caad29d77897272bfae7ffcde054f25
Author: Gian Merlino <gi...@imply.io>
AuthorDate: Thu May 30 12:56:29 2019 -0700

    SQL: Allow select-sort-project query shapes. (#7769)
    
    * SQL: Allow select-sort-project query shapes.
    
    Fixes #7768.
    
    Design changes:
    
    - In PartialDruidQuery, allow projection after select + sort by removing
      the SELECT_SORT query stage and instead allowing the SORT and
      SORT_PROJECT stages to apply either after aggregation or after a plain
      non-aggregating select. This is different from prior behavior, where
      SORT and SORT_PROJECT were only considered valid after aggregation
      stages. This logic change is in the "canAccept" method.
    - In DruidQuery, represent either kind of sorting with a single "Sorting"
      class (instead of DefaultLimitSpec). The Sorting class is still
      convertible into a DefaultLimitSpec, but is also convertible into the
      sorting parameters accepted by a Scan query.
    - In DruidQuery, represent post-select and post-sorting projections with
      a single "Projection" class. This obsoletes the SortProject and
      SelectProjection classes, and simplifies the DruidQuery by allowing us
      to move virtual-column and post-aggregator-creation logic into the
      new Projection class.
    - Split "DruidQuerySignature" into RowSignature and VirtualColumnRegistry.
      This effectively means that instead of having mutable and immutable
      versions of DruidQuerySignature, we instead of RowSignature (always
      immutable) and VirtualColumnRegistry (always mutable, but sometimes
      null). This change wasn't required, but IMO it this makes the logic
      involving them easier to follow, and makes it more clear when the
      virtual column registry is active and when it's not.
    
    Other changes:
    
    - ConvertBoundsToSelectors now just accepts a RowSignature, but we
      use the VirtualColumnRegistry.getFullRowSignature() method to get
      a signature that includes all columns, and therefore allows us to
      simplify the logic (no need to special-case virtual columns).
    - Add `__time` to the Scan column list if the query is ordering by time.
    
    * Remove unused import.
---
 .../hll/sql/HllSketchSqlAggregator.java            |   8 +-
 .../quantiles/sql/DoublesSketchSqlAggregator.java  |   8 +-
 .../theta/sql/ThetaSketchSqlAggregator.java        |   8 +-
 .../bloom/sql/BloomFilterSqlAggregator.java        |   8 +-
 .../filter/sql/BloomFilterOperatorConversion.java  |  14 +-
 ...FixedBucketsHistogramQuantileSqlAggregator.java |   8 +-
 .../histogram/sql/QuantileSqlAggregator.java       |   8 +-
 .../druid/sql/calcite/aggregation/Aggregation.java |  21 +-
 .../sql/calcite/aggregation/SqlAggregator.java     |  32 +-
 .../builtin/ApproxCountDistinctSqlAggregator.java  |   8 +-
 .../aggregation/builtin/CountSqlAggregator.java    |  19 +-
 .../aggregation/builtin/SimpleSqlAggregator.java   |   6 +-
 .../druid/sql/calcite/expression/Expressions.java  | 103 ++--
 .../calcite/expression/SqlOperatorConversion.java  |  14 +-
 .../expression/builtin/LikeOperatorConversion.java |  18 +-
 .../filtration/ConvertBoundsToSelectors.java       |  16 +-
 .../druid/sql/calcite/filtration/Filtration.java   |  14 +-
 .../apache/druid/sql/calcite/rel/DruidQuery.java   | 613 +++++++++------------
 .../druid/sql/calcite/rel/DruidSemiJoin.java       |   3 +-
 .../org/apache/druid/sql/calcite/rel/Grouping.java |   9 +
 .../druid/sql/calcite/rel/PartialDruidQuery.java   |  56 +-
 .../apache/druid/sql/calcite/rel/Projection.java   | 257 +++++++++
 .../druid/sql/calcite/rel/SelectProjection.java    |  90 ---
 .../apache/druid/sql/calcite/rel/SortProject.java  | 109 ----
 .../org/apache/druid/sql/calcite/rel/Sorting.java  | 146 +++++
 ...rySignature.java => VirtualColumnRegistry.java} |  83 ++-
 .../apache/druid/sql/calcite/rule/DruidRules.java  |   5 -
 .../druid/sql/calcite/rule/GroupByRules.java       |  27 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 102 +++-
 .../sql/calcite/filtration/FiltrationTest.java     |   3 +-
 30 files changed, 1011 insertions(+), 805 deletions(-)

diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java
index 93760ff..56931b6 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java
@@ -48,7 +48,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -71,7 +71,8 @@ public class HllSketchSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      VirtualColumnRegistry virtualColumnRegistry,
       RexBuilder rexBuilder,
       String name,
       AggregateCall aggregateCall,
@@ -80,7 +81,6 @@ public class HllSketchSqlAggregator implements SqlAggregator
       boolean finalizeAggregations
   )
   {
-    final RowSignature rowSignature = querySignature.getRowSignature();
     // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
     // for string columns.
     final RexNode columnRexNode = Expressions.fromFieldAccess(
@@ -148,7 +148,7 @@ public class HllSketchSqlAggregator implements SqlAggregator
       if (columnArg.isDirectColumnAccess()) {
         dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
       } else {
-        VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+        VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
             plannerContext,
             columnArg,
             sqlTypeName
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregator.java
index 96e4c81..f1856ed 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregator.java
@@ -44,7 +44,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -66,7 +66,8 @@ public class DoublesSketchSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final String name,
       final AggregateCall aggregateCall,
@@ -75,7 +76,6 @@ public class DoublesSketchSqlAggregator implements SqlAggregator
       final boolean finalizeAggregations
   )
   {
-    final RowSignature rowSignature = querySignature.getRowSignature();
     final DruidExpression input = Expressions.toDruidExpression(
         plannerContext,
         rowSignature,
@@ -179,7 +179,7 @@ public class DoublesSketchSqlAggregator implements SqlAggregator
           k
       );
     } else {
-      VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+      VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
           plannerContext,
           input,
           SqlTypeName.FLOAT
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregator.java
index 339f169..44dd3df 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregator.java
@@ -47,7 +47,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -70,7 +70,8 @@ public class ThetaSketchSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      VirtualColumnRegistry virtualColumnRegistry,
       RexBuilder rexBuilder,
       String name,
       AggregateCall aggregateCall,
@@ -79,7 +80,6 @@ public class ThetaSketchSqlAggregator implements SqlAggregator
       boolean finalizeAggregations
   )
   {
-    final RowSignature rowSignature = querySignature.getRowSignature();
     // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
     // for string columns.
     final RexNode columnRexNode = Expressions.fromFieldAccess(
@@ -136,7 +136,7 @@ public class ThetaSketchSqlAggregator implements SqlAggregator
       if (columnArg.isDirectColumnAccess()) {
         dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
       } else {
-        VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+        VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
             plannerContext,
             columnArg,
             sqlTypeName
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
index 48415d6..bf17a68 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
@@ -46,7 +46,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -68,7 +68,8 @@ public class BloomFilterSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      VirtualColumnRegistry virtualColumnRegistry,
       RexBuilder rexBuilder,
       String name,
       AggregateCall aggregateCall,
@@ -77,7 +78,6 @@ public class BloomFilterSqlAggregator implements SqlAggregator
       boolean finalizeAggregations
   )
   {
-    final RowSignature rowSignature = querySignature.getRowSignature();
     final RexNode inputOperand = Expressions.fromFieldAccess(
         rowSignature,
         project,
@@ -168,7 +168,7 @@ public class BloomFilterSqlAggregator implements SqlAggregator
           input.getSimpleExtraction().getExtractionFn()
       );
     } else {
-      VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+      VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
           plannerContext,
           input,
           inputOperand.getType().getSqlTypeName()
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/sql/BloomFilterOperatorConversion.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/sql/BloomFilterOperatorConversion.java
index a27ec4f..60497ff 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/sql/BloomFilterOperatorConversion.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/sql/BloomFilterOperatorConversion.java
@@ -39,7 +39,8 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.expression.OperatorConversions;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -68,14 +69,15 @@ public class BloomFilterOperatorConversion extends DirectOperatorConversion
   @Override
   public DimFilter toDruidFilter(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      @Nullable VirtualColumnRegistry virtualColumnRegistry,
       final RexNode rexNode
   )
   {
     final List<RexNode> operands = ((RexCall) rexNode).getOperands();
     final DruidExpression druidExpression = Expressions.toDruidExpression(
         plannerContext,
-        querySignature.getRowSignature(),
+        rowSignature,
         operands.get(0)
     );
     if (druidExpression == null) {
@@ -100,8 +102,8 @@ public class BloomFilterOperatorConversion extends DirectOperatorConversion
           holder,
           druidExpression.getSimpleExtraction().getExtractionFn()
       );
-    } else {
-      VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+    } else if (virtualColumnRegistry != null) {
+      VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
           plannerContext,
           druidExpression,
           operands.get(0).getType().getSqlTypeName()
@@ -114,6 +116,8 @@ public class BloomFilterOperatorConversion extends DirectOperatorConversion
           holder,
           null
       );
+    } else {
+      return null;
     }
   }
 }
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java
index 4c92082..85fb56f 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java
@@ -44,7 +44,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -66,7 +66,8 @@ public class FixedBucketsHistogramQuantileSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      VirtualColumnRegistry virtualColumnRegistry,
       RexBuilder rexBuilder,
       String name,
       AggregateCall aggregateCall,
@@ -75,7 +76,6 @@ public class FixedBucketsHistogramQuantileSqlAggregator implements SqlAggregator
       boolean finalizeAggregations
   )
   {
-    final RowSignature rowSignature = querySignature.getRowSignature();
     final DruidExpression input = Expressions.toDruidExpression(
         plannerContext,
         rowSignature,
@@ -234,7 +234,7 @@ public class FixedBucketsHistogramQuantileSqlAggregator implements SqlAggregator
           outlierHandlingMode
       );
     } else {
-      VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+      VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
           plannerContext,
           input,
           SqlTypeName.FLOAT
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
index c00ad49..240420a 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
@@ -46,7 +46,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -68,7 +68,8 @@ public class QuantileSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       final PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final String name,
       final AggregateCall aggregateCall,
@@ -77,7 +78,6 @@ public class QuantileSqlAggregator implements SqlAggregator
       final boolean finalizeAggregations
   )
   {
-    final RowSignature rowSignature = querySignature.getRowSignature();
     final DruidExpression input = Expressions.toDruidExpression(
         plannerContext,
         rowSignature,
@@ -196,7 +196,7 @@ public class QuantileSqlAggregator implements SqlAggregator
       }
     } else {
       final VirtualColumn virtualColumn =
-          querySignature.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.FLOAT);
+          virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.FLOAT);
       virtualColumns.add(virtualColumn);
       aggregatorFactory = new ApproximateHistogramAggregatorFactory(
           histogramName,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregation.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregation.java
index c987aed..4b4dbd5 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregation.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregation.java
@@ -31,7 +31,8 @@ import org.apache.druid.query.filter.AndDimFilter;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.sql.calcite.filtration.Filtration;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -150,7 +151,11 @@ public class Aggregation
            : Iterables.getOnlyElement(aggregatorFactories).getName();
   }
 
-  public Aggregation filter(final DruidQuerySignature querySignature, final DimFilter filter)
+  public Aggregation filter(
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
+      final DimFilter filter
+  )
   {
     if (filter == null) {
       return this;
@@ -173,13 +178,13 @@ public class Aggregation
     }
 
     final DimFilter baseOptimizedFilter = Filtration.create(filter)
-                                                    .optimizeFilterOnly(querySignature)
+                                                    .optimizeFilterOnly(virtualColumnRegistry.getFullRowSignature())
                                                     .getDimFilter();
 
     Set<VirtualColumn> aggVirtualColumnsPlusFilterColumns = new HashSet<>(virtualColumns);
     for (String column : baseOptimizedFilter.getRequiredColumns()) {
-      if (querySignature.isVirtualColumnDefined(column)) {
-        aggVirtualColumnsPlusFilterColumns.add(querySignature.getVirtualColumn(column));
+      if (virtualColumnRegistry.isVirtualColumnDefined(column)) {
+        aggVirtualColumnsPlusFilterColumns.add(virtualColumnRegistry.getVirtualColumn(column));
       }
     }
     final List<AggregatorFactory> newAggregators = new ArrayList<>();
@@ -187,15 +192,15 @@ public class Aggregation
       if (agg instanceof FilteredAggregatorFactory) {
         final FilteredAggregatorFactory filteredAgg = (FilteredAggregatorFactory) agg;
         for (String column : filteredAgg.getFilter().getRequiredColumns()) {
-          if (querySignature.isVirtualColumnDefined(column)) {
-            aggVirtualColumnsPlusFilterColumns.add(querySignature.getVirtualColumn(column));
+          if (virtualColumnRegistry.isVirtualColumnDefined(column)) {
+            aggVirtualColumnsPlusFilterColumns.add(virtualColumnRegistry.getVirtualColumn(column));
           }
         }
         newAggregators.add(
             new FilteredAggregatorFactory(
                 filteredAgg.getAggregator(),
                 Filtration.create(new AndDimFilter(ImmutableList.of(filteredAgg.getFilter(), baseOptimizedFilter)))
-                          .optimizeFilterOnly(querySignature)
+                          .optimizeFilterOnly(virtualColumnRegistry.getFullRowSignature())
                           .getDimFilter()
             )
         );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java
index 1c15474..abefede 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java
@@ -24,7 +24,8 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -45,26 +46,29 @@ public interface SqlAggregator
    * Returns a Druid Aggregation corresponding to a SQL {@link AggregateCall}. This method should ignore filters;
    * they will be applied to your aggregator in a later step.
    *
-   * @param plannerContext       SQL planner context
-   * @param querySignature       signature of the rows row signature and re-usable virtual column references
-   * @param rexBuilder           a rexBuilder, in case you need one
-   * @param name                 desired output name of the aggregation
-   * @param aggregateCall        aggregate call object
-   * @param project              project that should be applied before aggregation; may be null
-   * @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
-   *                             ignored if you do not want to re-use existing aggregations.
-   * @param finalizeAggregations true if this query should include explicit finalization for all of its
-   *                             aggregators, where required. This is set for subqueries where Druid's native query
-   *                             layer does not do this automatically.
+   * @param plannerContext        SQL planner context
+   * @param rowSignature          input row signature
+   * @param virtualColumnRegistry re-usable virtual column references
+   * @param rexBuilder            a rexBuilder, in case you need one
+   * @param name                  desired output name of the aggregation
+   * @param aggregateCall         aggregate call object
+   * @param project               project that should be applied before aggregation; may be null
+   * @param existingAggregations  existing aggregations for this query; useful for re-using aggregations. May be safely
+   *                              ignored if you do not want to re-use existing aggregations.
+   * @param finalizeAggregations  true if this query should include explicit finalization for all of its
+   *                              aggregators, where required. This is set for subqueries where Druid's native query
+   *                              layer does not do this automatically.
    *
    * @return aggregation, or null if the call cannot be translated
    */
   @Nullable
   Aggregation toDruidAggregation(
       PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      VirtualColumnRegistry virtualColumnRegistry,
       RexBuilder rexBuilder,
-      String name, AggregateCall aggregateCall,
+      String name,
+      AggregateCall aggregateCall,
       Project project,
       List<Aggregation> existingAggregations,
       boolean finalizeAggregations
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
index bc59a32..21cf736 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
@@ -47,7 +47,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -70,7 +70,8 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       final PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final String name,
       final AggregateCall aggregateCall,
@@ -79,7 +80,6 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
       final boolean finalizeAggregations
   )
   {
-    final RowSignature rowSignature = querySignature.getRowSignature();
     // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
     // for string columns.
     final RexNode rexNode = Expressions.fromFieldAccess(
@@ -112,7 +112,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
         dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType);
       } else {
         VirtualColumn virtualColumn =
-            querySignature.getOrCreateVirtualColumnForExpression(plannerContext, arg, sqlTypeName);
+            virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, arg, sqlTypeName);
         dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType);
         myvirtualColumns.add(virtualColumn);
       }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
index 6ab7a16..4bf35e1 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
@@ -36,7 +36,8 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -55,7 +56,8 @@ public class CountSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final String name,
       final AggregateCall aggregateCall,
@@ -66,7 +68,7 @@ public class CountSqlAggregator implements SqlAggregator
   {
     final List<DruidExpression> args = Aggregations.getArgumentsForSimpleAggregator(
         plannerContext,
-        querySignature.getRowSignature(),
+        rowSignature,
         aggregateCall,
         project
     );
@@ -83,7 +85,8 @@ public class CountSqlAggregator implements SqlAggregator
       if (plannerContext.getPlannerConfig().isUseApproximateCountDistinct()) {
         return APPROX_COUNT_DISTINCT.toDruidAggregation(
             plannerContext,
-            querySignature,
+            rowSignature,
+            virtualColumnRegistry,
             rexBuilder,
             name, aggregateCall, project, existingAggregations,
             finalizeAggregations
@@ -96,7 +99,7 @@ public class CountSqlAggregator implements SqlAggregator
 
       // COUNT(x) should count all non-null values of x.
       final RexNode rexNode = Expressions.fromFieldAccess(
-          querySignature.getRowSignature(),
+          rowSignature,
           project,
           Iterables.getOnlyElement(aggregateCall.getArgList())
       );
@@ -104,7 +107,8 @@ public class CountSqlAggregator implements SqlAggregator
       if (rexNode.getType().isNullable()) {
         final DimFilter nonNullFilter = Expressions.toFilter(
             plannerContext,
-            querySignature,
+            rowSignature,
+            virtualColumnRegistry,
             rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, ImmutableList.of(rexNode))
         );
 
@@ -113,7 +117,8 @@ public class CountSqlAggregator implements SqlAggregator
           throw new ISE("Could not create not-null filter for rexNode[%s]", rexNode);
         }
 
-        return Aggregation.create(new CountAggregatorFactory(name)).filter(querySignature, nonNullFilter);
+        return Aggregation.create(new CountAggregatorFactory(name))
+                          .filter(rowSignature, virtualColumnRegistry, nonNullFilter);
       } else {
         return Aggregation.create(new CountAggregatorFactory(name));
       }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java
index b98450b..2415762 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java
@@ -29,7 +29,7 @@ import org.apache.druid.sql.calcite.aggregation.Aggregations;
 import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -44,7 +44,8 @@ public abstract class SimpleSqlAggregator implements SqlAggregator
   @Override
   public Aggregation toDruidAggregation(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final String name,
       final AggregateCall aggregateCall,
@@ -56,7 +57,6 @@ public abstract class SimpleSqlAggregator implements SqlAggregator
     if (aggregateCall.isDistinct()) {
       return null;
     }
-    final RowSignature rowSignature = querySignature.getRowSignature();
 
     final List<DruidExpression> arguments = Aggregations.getArgumentsForSimpleAggregator(
         plannerContext,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java
index ef8764c..2413cdb 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java
@@ -58,7 +58,7 @@ import org.apache.druid.sql.calcite.filtration.Bounds;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 import org.joda.time.Interval;
 
@@ -208,14 +208,16 @@ public class Expressions
   /**
    * Translates "condition" to a Druid filter, or returns null if we cannot translate the condition.
    *
-   * @param plannerContext planner context
-   * @param querySignature   row signature of the dataSource to be filtered
-   * @param expression     Calcite row expression
+   * @param plannerContext        planner context
+   * @param rowSignature          input row signature
+   * @param virtualColumnRegistry re-usable virtual column references, may be null if virtual columns aren't allowed
+   * @param expression            Calcite row expression
    */
   @Nullable
   public static DimFilter toFilter(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      @Nullable final VirtualColumnRegistry virtualColumnRegistry,
       final RexNode expression
   )
   {
@@ -224,20 +226,27 @@ public class Expressions
     if (kind == SqlKind.IS_TRUE || kind == SqlKind.IS_NOT_FALSE) {
       return toFilter(
           plannerContext,
-          querySignature,
+          rowSignature,
+          virtualColumnRegistry,
           Iterables.getOnlyElement(((RexCall) expression).getOperands())
       );
     } else if (kind == SqlKind.IS_FALSE || kind == SqlKind.IS_NOT_TRUE) {
       return new NotDimFilter(
           toFilter(
               plannerContext,
-              querySignature,
+              rowSignature,
+              virtualColumnRegistry,
               Iterables.getOnlyElement(((RexCall) expression).getOperands())
           )
       );
     } else if (kind == SqlKind.CAST && expression.getType().getSqlTypeName() == SqlTypeName.BOOLEAN) {
       // Calcite sometimes leaves errant, useless cast-to-booleans inside filters. Strip them and continue.
-      return toFilter(plannerContext, querySignature, Iterables.getOnlyElement(((RexCall) expression).getOperands()));
+      return toFilter(
+          plannerContext,
+          rowSignature,
+          virtualColumnRegistry,
+          Iterables.getOnlyElement(((RexCall) expression).getOperands())
+      );
     } else if (kind == SqlKind.AND
                || kind == SqlKind.OR
                || kind == SqlKind.NOT) {
@@ -245,7 +254,8 @@ public class Expressions
       for (final RexNode rexNode : ((RexCall) expression).getOperands()) {
         final DimFilter nextFilter = toFilter(
             plannerContext,
-            querySignature,
+            rowSignature,
+            virtualColumnRegistry,
             rexNode
         );
         if (nextFilter == null) {
@@ -264,7 +274,7 @@ public class Expressions
       }
     } else {
       // Handle filter conditions on everything else.
-      return toLeafFilter(plannerContext, querySignature, expression);
+      return toLeafFilter(plannerContext, rowSignature, virtualColumnRegistry, expression);
     }
   }
 
@@ -272,14 +282,16 @@ public class Expressions
    * Translates "condition" to a Druid filter, assuming it does not contain any boolean expressions. Returns null
    * if we cannot translate the condition.
    *
-   * @param plannerContext planner context
-   * @param querySignature   row signature of the dataSource to be filtered
-   * @param rexNode        Calcite row expression
+   * @param plannerContext        planner context
+   * @param rowSignature          input row signature
+   * @param virtualColumnRegistry re-usable virtual column references, may be null if virtual columns aren't allowed
+   * @param rexNode               Calcite row expression
    */
   @Nullable
   private static DimFilter toLeafFilter(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      @Nullable final VirtualColumnRegistry virtualColumnRegistry,
       final RexNode rexNode
   )
   {
@@ -291,21 +303,29 @@ public class Expressions
 
     final DimFilter simpleFilter = toSimpleLeafFilter(
         plannerContext,
-        querySignature,
+        rowSignature,
+        virtualColumnRegistry,
         rexNode
     );
     return simpleFilter != null
            ? simpleFilter
-           : toExpressionLeafFilter(plannerContext, querySignature.getRowSignature(), rexNode);
+           : toExpressionLeafFilter(plannerContext, rowSignature, rexNode);
   }
 
   /**
-   * Translates to a simple leaf filter, i.e. is not an expression filter.
+   * Translates to a simple leaf filter, i.e. not an "expression" type filter. Note that the filter may still
+   * reference expression virtual columns, if and only if "virtualColumnRegistry" is defined.
+   *
+   * @param plannerContext        planner context
+   * @param rowSignature          input row signature
+   * @param virtualColumnRegistry re-usable virtual column references, may be null if virtual columns aren't allowed
+   * @param rexNode               Calcite row expression
    */
   @Nullable
   private static DimFilter toSimpleLeafFilter(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      @Nullable final VirtualColumnRegistry virtualColumnRegistry,
       final RexNode rexNode
   )
   {
@@ -314,24 +334,23 @@ public class Expressions
     if (kind == SqlKind.IS_TRUE || kind == SqlKind.IS_NOT_FALSE) {
       return toSimpleLeafFilter(
           plannerContext,
-          querySignature,
+          rowSignature,
+          virtualColumnRegistry,
           Iterables.getOnlyElement(((RexCall) rexNode).getOperands())
       );
     } else if (kind == SqlKind.IS_FALSE || kind == SqlKind.IS_NOT_TRUE) {
       return new NotDimFilter(
           toSimpleLeafFilter(
               plannerContext,
-              querySignature,
+              rowSignature,
+              virtualColumnRegistry,
               Iterables.getOnlyElement(((RexCall) rexNode).getOperands())
           )
       );
     } else if (kind == SqlKind.IS_NULL || kind == SqlKind.IS_NOT_NULL) {
       final RexNode operand = Iterables.getOnlyElement(((RexCall) rexNode).getOperands());
 
-      // operand must be translatable to a SimpleExtraction to be simple-filterable
-      final DruidExpression druidExpression =
-          toDruidExpression(plannerContext, querySignature.getRowSignature(), operand);
-
+      final DruidExpression druidExpression = toDruidExpression(plannerContext, rowSignature, operand);
       if (druidExpression == null) {
         return null;
       }
@@ -343,20 +362,20 @@ public class Expressions
             NullHandling.defaultStringValue(),
             druidExpression.getSimpleExtraction().getExtractionFn()
         );
-      } else {
-        final VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+      } else if (virtualColumnRegistry != null) {
+        final VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
             plannerContext,
             druidExpression,
             operand.getType().getSqlTypeName()
         );
-        if (virtualColumn == null) {
-          return null;
-        }
+
         equalFilter = new SelectorDimFilter(
             virtualColumn.getOutputName(),
             NullHandling.defaultStringValue(),
             null
         );
+      } else {
+        return null;
       }
 
       return kind == SqlKind.IS_NOT_NULL ? new NotDimFilter(equalFilter) : equalFilter;
@@ -414,7 +433,7 @@ public class Expressions
       }
 
       // Translate lhs to a DruidExpression.
-      final DruidExpression lhsExpression = toDruidExpression(plannerContext, querySignature.getRowSignature(), lhs);
+      final DruidExpression lhsExpression = toDruidExpression(plannerContext, rowSignature, lhs);
       if (lhsExpression == null) {
         return null;
       }
@@ -432,17 +451,17 @@ public class Expressions
       if (lhsExpression.isSimpleExtraction()) {
         column = lhsExpression.getSimpleExtraction().getColumn();
         extractionFn = lhsExpression.getSimpleExtraction().getExtractionFn();
-      } else {
-        VirtualColumn virtualLhs = querySignature.getOrCreateVirtualColumnForExpression(
+      } else if (virtualColumnRegistry != null) {
+        VirtualColumn virtualLhs = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
             plannerContext,
             lhsExpression,
             lhs.getType().getSqlTypeName()
         );
-        if (virtualLhs == null) {
-          return null;
-        }
+
         column = virtualLhs.getOutputName();
         extractionFn = null;
+      } else {
+        return null;
       }
 
       if (column.equals(ColumnHolder.TIME_COLUMN_NAME) && extractionFn instanceof TimeFormatExtractionFn) {
@@ -515,22 +534,16 @@ public class Expressions
       return filter;
     } else if (rexNode instanceof RexCall) {
       final SqlOperator operator = ((RexCall) rexNode).getOperator();
-
-      final SqlOperatorConversion conversion =
-          plannerContext.getOperatorTable().lookupOperatorConversion(operator);
+      final SqlOperatorConversion conversion = plannerContext.getOperatorTable().lookupOperatorConversion(operator);
 
       if (conversion == null) {
         return null;
       } else {
-        DimFilter filter =
-            conversion.toDruidFilter(plannerContext, querySignature, rexNode);
-        if (filter != null) {
-          return filter;
-        }
-        return null;
+        return conversion.toDruidFilter(plannerContext, rowSignature, virtualColumnRegistry, rexNode);
       }
+    } else {
+      return null;
     }
-    return null;
   }
 
   /**
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/SqlOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/SqlOperatorConversion.java
index 88d4947..a88fc9d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/SqlOperatorConversion.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/SqlOperatorConversion.java
@@ -23,7 +23,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
@@ -54,16 +54,18 @@ public interface SqlOperatorConversion
   /**
    * Returns a Druid filter corresponding to a Calcite {@code RexNode} used as a filter condition.
    *
-   * @param plannerContext   SQL planner context
-   * @param querySignature   signature of the rows being filtered, and any expression column references
-   * @param rexNode          filter expression rex node
+   * @param plannerContext        SQL planner context
+   * @param rowSignature          input row signature
+   * @param virtualColumnRegistry re-usable virtual column references
+   * @param rexNode               filter expression rex node
    *
-   * @return filter, or null if the call cannot be translated
+   * @return filter, or null if the call cannot be translated to a filter
    */
   @Nullable
   default DimFilter toDruidFilter(
       PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      @Nullable VirtualColumnRegistry virtualColumnRegistry,
       RexNode rexNode
   )
   {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/LikeOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/LikeOperatorConversion.java
index b1b7d9d..8019f27 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/LikeOperatorConversion.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/LikeOperatorConversion.java
@@ -31,7 +31,8 @@ import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -55,14 +56,15 @@ public class LikeOperatorConversion extends DirectOperatorConversion
   @Override
   public DimFilter toDruidFilter(
       PlannerContext plannerContext,
-      DruidQuerySignature querySignature,
+      RowSignature rowSignature,
+      @Nullable VirtualColumnRegistry virtualColumnRegistry,
       RexNode rexNode
   )
   {
     final List<RexNode> operands = ((RexCall) rexNode).getOperands();
     final DruidExpression druidExpression = Expressions.toDruidExpression(
         plannerContext,
-        querySignature.getRowSignature(),
+        rowSignature,
         operands.get(0)
     );
     if (druidExpression == null) {
@@ -76,21 +78,21 @@ public class LikeOperatorConversion extends DirectOperatorConversion
           operands.size() > 2 ? RexLiteral.stringValue(operands.get(2)) : null,
           druidExpression.getSimpleExtraction().getExtractionFn()
       );
-    } else {
-      VirtualColumn v = querySignature.getOrCreateVirtualColumnForExpression(
+    } else if (virtualColumnRegistry != null) {
+      VirtualColumn v = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
           plannerContext,
           druidExpression,
           operands.get(0).getType().getSqlTypeName()
       );
-      if (v == null) {
-        return null;
-      }
+
       return new LikeDimFilter(
           v.getOutputName(),
           RexLiteral.stringValue(operands.get(1)),
           operands.size() > 2 ? RexLiteral.stringValue(operands.get(2)) : null,
           null
       );
+    } else {
+      return null;
     }
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/filtration/ConvertBoundsToSelectors.java b/sql/src/main/java/org/apache/druid/sql/calcite/filtration/ConvertBoundsToSelectors.java
index e75fe14..f93407a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/filtration/ConvertBoundsToSelectors.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/filtration/ConvertBoundsToSelectors.java
@@ -24,20 +24,20 @@ import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.query.ordering.StringComparator;
 import org.apache.druid.sql.calcite.expression.SimpleExtraction;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.table.RowSignature;
 
 public class ConvertBoundsToSelectors extends BottomUpTransform
 {
-  private final DruidQuerySignature querySignature;
+  private final RowSignature rowSignature;
 
-  private ConvertBoundsToSelectors(final DruidQuerySignature querySignature)
+  private ConvertBoundsToSelectors(final RowSignature rowSignature)
   {
-    this.querySignature = querySignature;
+    this.rowSignature = rowSignature;
   }
 
-  public static ConvertBoundsToSelectors create(final DruidQuerySignature querySignature)
+  public static ConvertBoundsToSelectors create(final RowSignature rowSignature)
   {
-    return new ConvertBoundsToSelectors(querySignature);
+    return new ConvertBoundsToSelectors(rowSignature);
   }
 
   @Override
@@ -45,7 +45,7 @@ public class ConvertBoundsToSelectors extends BottomUpTransform
   {
     if (filter instanceof BoundDimFilter) {
       final BoundDimFilter bound = (BoundDimFilter) filter;
-      final StringComparator comparator = querySignature.getRowSignature().naturalStringComparator(
+      final StringComparator comparator = rowSignature.naturalStringComparator(
           SimpleExtraction.of(bound.getDimension(), bound.getExtractionFn())
       );
 
@@ -54,7 +54,7 @@ public class ConvertBoundsToSelectors extends BottomUpTransform
           && bound.getUpper().equals(bound.getLower())
           && !bound.isUpperStrict()
           && !bound.isLowerStrict()
-          && (querySignature.isVirtualColumnDefined(bound.getDimension()) || bound.getOrdering().equals(comparator))) {
+          && bound.getOrdering().equals(comparator)) {
         return new SelectorDimFilter(
             bound.getDimension(),
             bound.getUpper(),
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/filtration/Filtration.java b/sql/src/main/java/org/apache/druid/sql/calcite/filtration/Filtration.java
index 1d891d2..4d2a991 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/filtration/Filtration.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/filtration/Filtration.java
@@ -28,7 +28,7 @@ import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.filter.ExpressionDimFilter;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.spec.QuerySegmentSpec;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.table.RowSignature;
 import org.joda.time.Interval;
 
 import java.util.List;
@@ -108,15 +108,15 @@ public class Filtration
    *
    * @return equivalent Filtration
    */
-  public Filtration optimize(final DruidQuerySignature querySignature)
+  public Filtration optimize(final RowSignature rowSignature)
   {
     return transform(
         this,
         ImmutableList.of(
             CombineAndSimplifyBounds.instance(),
             MoveTimeFiltersToIntervals.instance(),
-            ConvertBoundsToSelectors.create(querySignature),
-            ConvertSelectorsToIns.create(querySignature.getRowSignature()),
+            ConvertBoundsToSelectors.create(rowSignature),
+            ConvertSelectorsToIns.create(rowSignature),
             MoveMarkerFiltersToIntervals.instance(),
             ValidateNoMarkerFiltersRemain.instance()
         )
@@ -128,7 +128,7 @@ public class Filtration
    *
    * @return equivalent Filtration
    */
-  public Filtration optimizeFilterOnly(final DruidQuerySignature querySignature)
+  public Filtration optimizeFilterOnly(final RowSignature rowSignature)
   {
     if (!intervals.equals(ImmutableList.of(eternity()))) {
       throw new ISE("Cannot optimizeFilterOnly when intervals are set");
@@ -138,8 +138,8 @@ public class Filtration
         this,
         ImmutableList.of(
             CombineAndSimplifyBounds.instance(),
-            ConvertBoundsToSelectors.create(querySignature),
-            ConvertSelectorsToIns.create(querySignature.getRowSignature())
+            ConvertBoundsToSelectors.create(rowSignature),
+            ConvertSelectorsToIns.create(rowSignature)
         )
     );
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index c5a7320..1c72b42 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -21,11 +21,11 @@ package org.apache.druid.sql.calcite.rel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
+import com.google.common.primitives.Ints;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.core.Aggregate;
@@ -44,18 +44,17 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.math.expr.ExprType;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.query.DataSource;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
 import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
+import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
 import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparator;
 import org.apache.druid.query.ordering.StringComparators;
@@ -105,25 +104,19 @@ public class DruidQuery
   private final DimFilter filter;
 
   @Nullable
-  private final SelectProjection selectProjection;
+  private final Projection selectProjection;
 
   @Nullable
   private final Grouping grouping;
 
   @Nullable
-  private final SortProject sortProject;
-
-  @Nullable
-  private final DefaultLimitSpec limitSpec;
-
-  @Nullable
-  private final RelDataType outputRowType;
+  private final Sorting sorting;
 
   private final Query query;
-
-  private final DruidQuerySignature sourceQuerySignature;
-
+  private final RowSignature sourceRowSignature;
   private final RowSignature outputRowSignature;
+  private final RelDataType outputRowType;
+  private final VirtualColumnRegistry virtualColumnRegistry;
 
   public DruidQuery(
       final PartialDruidQuery partialQuery,
@@ -136,58 +129,88 @@ public class DruidQuery
   {
     this.dataSource = dataSource;
     this.outputRowType = partialQuery.leafRel().getRowType();
-    this.sourceQuerySignature = new DruidQuerySignature(sourceRowSignature);
+    this.sourceRowSignature = sourceRowSignature;
+    this.virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
     this.plannerContext = plannerContext;
 
     // Now the fun begins.
-    this.filter = computeWhereFilter(partialQuery, plannerContext, sourceQuerySignature);
-    this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceQuerySignature);
-    this.grouping = computeGrouping(
-        partialQuery,
-        plannerContext,
-        sourceQuerySignature,
-        rexBuilder,
-        finalizeAggregations
-    );
-
-    final RowSignature sortingInputRowSignature;
-    if (this.selectProjection != null) {
-      sortingInputRowSignature = this.selectProjection.getOutputRowSignature();
-    } else if (this.grouping != null) {
-      sortingInputRowSignature = this.grouping.getOutputRowSignature();
+    if (partialQuery.getWhereFilter() != null) {
+      this.filter = Preconditions.checkNotNull(
+          computeWhereFilter(
+              partialQuery,
+              plannerContext,
+              sourceRowSignature,
+              virtualColumnRegistry
+          )
+      );
+    } else {
+      this.filter = null;
+    }
+
+    // Only compute "selectProjection" if this is a non-aggregating query. (For aggregating queries, "grouping" will
+    // reflect select-project from partialQuery on its own.)
+    if (partialQuery.getSelectProject() != null && partialQuery.getAggregate() == null) {
+      this.selectProjection = Preconditions.checkNotNull(
+          computeSelectProjection(
+              partialQuery,
+              plannerContext,
+              computeOutputRowSignature(),
+              virtualColumnRegistry
+          )
+      );
+    } else {
+      this.selectProjection = null;
+    }
+
+    if (partialQuery.getAggregate() != null) {
+      this.grouping = Preconditions.checkNotNull(
+          computeGrouping(
+              partialQuery,
+              plannerContext,
+              computeOutputRowSignature(),
+              virtualColumnRegistry,
+              rexBuilder,
+              finalizeAggregations
+          )
+      );
+    } else {
+      this.grouping = null;
+    }
+
+    if (partialQuery.getSort() != null) {
+      this.sorting = Preconditions.checkNotNull(
+          computeSorting(
+              partialQuery,
+              plannerContext,
+              computeOutputRowSignature(),
+              // When sorting follows grouping, virtual columns cannot be used
+              partialQuery.getAggregate() != null ? null : virtualColumnRegistry
+          )
+      );
     } else {
-      sortingInputRowSignature = sourceRowSignature;
+      this.sorting = null;
     }
 
-    this.sortProject = computeSortProject(partialQuery, plannerContext, sortingInputRowSignature);
-
-    this.outputRowSignature = sortProject == null ? sortingInputRowSignature : sortProject.getOutputRowSignature();
-
-    this.limitSpec = computeLimitSpec(partialQuery, sortingInputRowSignature);
+    this.outputRowSignature = computeOutputRowSignature();
     this.query = computeQuery();
   }
 
-  @Nullable
+  @Nonnull
   private static DimFilter computeWhereFilter(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry
   )
   {
-    final Filter whereFilter = partialQuery.getWhereFilter();
-
-    if (whereFilter == null) {
-      return null;
-    }
-
-    return getDimFilter(plannerContext, querySignature, whereFilter);
+    return getDimFilter(plannerContext, rowSignature, virtualColumnRegistry, partialQuery.getWhereFilter());
   }
 
   @Nullable
   private static DimFilter computeHavingFilter(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature
+      final RowSignature aggregateSignature
   )
   {
     final Filter havingFilter = partialQuery.getHavingFilter();
@@ -196,20 +219,23 @@ public class DruidQuery
       return null;
     }
 
-    return getDimFilter(plannerContext, querySignature, havingFilter);
+    // null virtualColumnRegistry, since virtual columns cannot be referenced by "having" filters.
+    return getDimFilter(plannerContext, aggregateSignature, null, havingFilter);
   }
 
   @Nonnull
   private static DimFilter getDimFilter(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
-      Filter filter
+      final RowSignature rowSignature,
+      @Nullable final VirtualColumnRegistry virtualColumnRegistry,
+      final Filter filter
   )
   {
     final RexNode condition = filter.getCondition();
     final DimFilter dimFilter = Expressions.toFilter(
         plannerContext,
-        querySignature,
+        rowSignature,
+        virtualColumnRegistry,
         condition
     );
     if (dimFilter == null) {
@@ -219,83 +245,48 @@ public class DruidQuery
     }
   }
 
-  @Nullable
-  private static SelectProjection computeSelectProjection(
+  @Nonnull
+  private static Projection computeSelectProjection(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
-      final DruidQuerySignature queryColumns
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry
   )
   {
-    final Project project = partialQuery.getSelectProject();
-
-    if (project == null || partialQuery.getAggregate() != null) {
-      return null;
-    }
-
-    final List<DruidExpression> expressions = new ArrayList<>();
-
-    for (final RexNode rexNode : project.getChildExps()) {
-      final DruidExpression expression = Expressions.toDruidExpression(
-          plannerContext,
-          queryColumns.getRowSignature(),
-          rexNode
-      );
-
-      if (expression == null) {
-        throw new CannotBuildQueryException(project, rexNode);
-      } else {
-        expressions.add(expression);
-      }
-    }
+    final Project project = Preconditions.checkNotNull(partialQuery.getSelectProject(), "selectProject");
 
-    final List<String> directColumns = new ArrayList<>();
-    final Set<VirtualColumn> virtualColumns = new HashSet<>();
-    final List<String> rowOrder = new ArrayList<>();
-
-    for (int i = 0; i < expressions.size(); i++) {
-      final DruidExpression expression = expressions.get(i);
-      if (expression.isDirectColumnAccess()) {
-        directColumns.add(expression.getDirectColumn());
-        rowOrder.add(expression.getDirectColumn());
-      } else {
-        VirtualColumn virtualColumn = queryColumns.getOrCreateVirtualColumnForExpression(
-            plannerContext,
-            expression,
-            project.getChildExps().get(i).getType().getSqlTypeName()
-        );
-        virtualColumns.add(virtualColumn);
-        rowOrder.add(virtualColumn.getOutputName());
-      }
+    if (partialQuery.getAggregate() != null) {
+      throw new ISE("Cannot have both 'selectProject' and 'aggregate', how can this be?");
+    } else {
+      return Projection.preAggregation(project, plannerContext, rowSignature, virtualColumnRegistry);
     }
-
-    return new SelectProjection(
-        directColumns,
-        ImmutableList.copyOf(virtualColumns),
-        RowSignature.from(rowOrder, project.getRowType())
-    );
   }
 
-  @Nullable
+  @Nonnull
   private static Grouping computeGrouping(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
-      final DruidQuerySignature queryColumns,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final boolean finalizeAggregations
   )
   {
-    final Aggregate aggregate = partialQuery.getAggregate();
+    final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate(), "aggregate");
     final Project aggregateProject = partialQuery.getAggregateProject();
 
-    if (aggregate == null) {
-      return null;
-    }
+    final List<DimensionExpression> dimensions = computeDimensions(
+        partialQuery,
+        plannerContext,
+        rowSignature,
+        virtualColumnRegistry
+    );
 
-    final List<DimensionExpression> dimensions = computeDimensions(partialQuery, plannerContext, queryColumns);
     final List<Aggregation> aggregations = computeAggregations(
         partialQuery,
         plannerContext,
-        queryColumns,
+        rowSignature,
+        virtualColumnRegistry,
         rexBuilder,
         finalizeAggregations
     );
@@ -310,23 +301,23 @@ public class DruidQuery
         aggregate.getRowType()
     );
 
-    DruidQuerySignature aggregateSignature = queryColumns.asAggregateSignature(aggregateRowSignature);
     final DimFilter havingFilter = computeHavingFilter(
         partialQuery,
         plannerContext,
-        aggregateSignature
+        aggregateRowSignature
     );
 
     if (aggregateProject == null) {
       return Grouping.create(dimensions, aggregations, havingFilter, aggregateRowSignature);
     } else {
-      final ProjectRowOrderAndPostAggregations projectRowOrderAndPostAggregations = computePostAggregations(
+      final Projection postAggregationProjection = Projection.postAggregation(
+          aggregateProject,
           plannerContext,
           aggregateRowSignature,
-          aggregateProject,
           "p"
       );
-      projectRowOrderAndPostAggregations.postAggregations.forEach(
+
+      postAggregationProjection.getPostAggregators().forEach(
           postAggregator -> aggregations.add(Aggregation.create(postAggregator))
       );
 
@@ -342,106 +333,17 @@ public class DruidQuery
         }
       }
 
-      return Grouping.create(
-          dimensions,
-          aggregations,
-          havingFilter,
-          RowSignature.from(projectRowOrderAndPostAggregations.rowOrder, aggregateProject.getRowType())
-      );
+      return Grouping.create(dimensions, aggregations, havingFilter, postAggregationProjection.getOutputRowSignature());
     }
   }
 
-  @Nullable
-  private SortProject computeSortProject(
-      PartialDruidQuery partialQuery,
-      PlannerContext plannerContext,
-      RowSignature sortingInputRowSignature
-  )
-  {
-    final Project sortProject = partialQuery.getSortProject();
-    if (sortProject == null) {
-      return null;
-    } else {
-      final ProjectRowOrderAndPostAggregations projectRowOrderAndPostAggregations = computePostAggregations(
-          plannerContext,
-          sortingInputRowSignature,
-          sortProject,
-          "s"
-      );
-
-      return new SortProject(
-          sortingInputRowSignature,
-          projectRowOrderAndPostAggregations.postAggregations,
-          RowSignature.from(projectRowOrderAndPostAggregations.rowOrder, sortProject.getRowType())
-      );
-    }
-  }
-
-  private static class ProjectRowOrderAndPostAggregations
-  {
-    private final List<String> rowOrder;
-    private final List<PostAggregator> postAggregations;
-
-    ProjectRowOrderAndPostAggregations(List<String> rowOrder, List<PostAggregator> postAggregations)
-    {
-      this.rowOrder = rowOrder;
-      this.postAggregations = postAggregations;
-    }
-  }
-
-  private static ProjectRowOrderAndPostAggregations computePostAggregations(
-      PlannerContext plannerContext,
-      RowSignature inputRowSignature,
-      Project project,
-      String basePrefix
-  )
-  {
-    final List<String> rowOrder = new ArrayList<>();
-    final List<PostAggregator> aggregations = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findUnusedPrefix(
-        basePrefix,
-        new TreeSet<>(inputRowSignature.getRowOrder())
-    );
-
-    int outputNameCounter = 0;
-    for (final RexNode postAggregatorRexNode : project.getChildExps()) {
-      // Attempt to convert to PostAggregator.
-      final DruidExpression postAggregatorExpression = Expressions.toDruidExpression(
-          plannerContext,
-          inputRowSignature,
-          postAggregatorRexNode
-      );
-
-      if (postAggregatorExpression == null) {
-        throw new CannotBuildQueryException(project, postAggregatorRexNode);
-      }
-
-      if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
-        // Direct column access, without any type cast as far as Druid's runtime is concerned.
-        // (There might be a SQL-level type cast that we don't care about)
-        rowOrder.add(postAggregatorExpression.getDirectColumn());
-      } else {
-        final String postAggregatorName = outputNamePrefix + outputNameCounter++;
-        final PostAggregator postAggregator = new ExpressionPostAggregator(
-            postAggregatorName,
-            postAggregatorExpression.getExpression(),
-            null,
-            plannerContext.getExprMacroTable()
-        );
-        aggregations.add(postAggregator);
-        rowOrder.add(postAggregator.getName());
-      }
-    }
-
-    return new ProjectRowOrderAndPostAggregations(rowOrder, aggregations);
-  }
-
   /**
    * Returns dimensions corresponding to {@code aggregate.getGroupSet()}, in the same order.
    *
-   * @param partialQuery   partial query
-   * @param plannerContext planner context
-   * @param querySignature source row signature and re-usable virtual column references
+   * @param partialQuery          partial query
+   * @param plannerContext        planner context
+   * @param rowSignature          source row signature
+   * @param virtualColumnRegistry re-usable virtual column references
    *
    * @return dimensions
    *
@@ -450,29 +352,23 @@ public class DruidQuery
   private static List<DimensionExpression> computeDimensions(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry
   )
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<DimensionExpression> dimensions = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findUnusedPrefix(
-        "d",
-        new TreeSet<>(querySignature.getRowSignature().getRowOrder())
-    );
+    final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(rowSignature.getRowOrder()));
     int outputNameCounter = 0;
 
     for (int i : aggregate.getGroupSet()) {
       // Dimension might need to create virtual columns. Avoid giving it a name that would lead to colliding columns.
       final RexNode rexNode = Expressions.fromFieldAccess(
-          querySignature.getRowSignature(),
+          rowSignature,
           partialQuery.getSelectProject(),
           i
       );
-      final DruidExpression druidExpression = Expressions.toDruidExpression(
-          plannerContext,
-          querySignature.getRowSignature(),
-          rexNode
-      );
+      final DruidExpression druidExpression = Expressions.toDruidExpression(plannerContext, rowSignature, rexNode);
       if (druidExpression == null) {
         throw new CannotBuildQueryException(aggregate, rexNode);
       }
@@ -488,7 +384,7 @@ public class DruidQuery
 
       final String dimOutputName;
       if (!druidExpression.isSimpleExtraction()) {
-        virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
+        virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
             plannerContext,
             druidExpression,
             sqlTypeName
@@ -507,13 +403,14 @@ public class DruidQuery
   /**
    * Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order.
    *
-   * @param partialQuery         partial query
-   * @param plannerContext       planner context
-   * @param querySignature       source row signature and re-usable virtual column references
-   * @param rexBuilder           calcite RexBuilder
-   * @param finalizeAggregations true if this query should include explicit finalization for all of its
-   *                             aggregators, where required. Useful for subqueries where Druid's native query layer
-   *                             does not do this automatically.
+   * @param partialQuery          partial query
+   * @param plannerContext        planner context
+   * @param rowSignature          source row signature
+   * @param virtualColumnRegistry re-usable virtual column references
+   * @param rexBuilder            calcite RexBuilder
+   * @param finalizeAggregations  true if this query should include explicit finalization for all of its
+   *                              aggregators, where required. Useful for subqueries where Druid's native query layer
+   *                              does not do this automatically.
    *
    * @return aggregations
    *
@@ -522,24 +419,23 @@ public class DruidQuery
   private static List<Aggregation> computeAggregations(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final boolean finalizeAggregations
   )
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<Aggregation> aggregations = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findUnusedPrefix(
-        "a",
-        new TreeSet<>(querySignature.getRowSignature().getRowOrder())
-    );
+    final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(rowSignature.getRowOrder()));
 
     for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
       final String aggName = outputNamePrefix + i;
       final AggregateCall aggCall = aggregate.getAggCallList().get(i);
       final Aggregation aggregation = GroupByRules.translateAggregateCall(
           plannerContext,
-          querySignature,
+          rowSignature,
+          virtualColumnRegistry,
           rexBuilder,
           partialQuery.getSelectProject(),
           aggregations,
@@ -558,29 +454,23 @@ public class DruidQuery
     return aggregations;
   }
 
-  @Nullable
-  private static DefaultLimitSpec computeLimitSpec(
+  @Nonnull
+  private static Sorting computeSorting(
       final PartialDruidQuery partialQuery,
-      final RowSignature outputRowSignature
+      final PlannerContext plannerContext,
+      final RowSignature rowSignature,
+      @Nullable final VirtualColumnRegistry virtualColumnRegistry
   )
   {
-    final Sort sort;
-
-    if (partialQuery.getAggregate() == null) {
-      sort = partialQuery.getSelectSort();
-    } else {
-      sort = partialQuery.getSort();
-    }
-
-    if (sort == null) {
-      return null;
-    }
+    final Sort sort = Preconditions.checkNotNull(partialQuery.getSort(), "sort");
+    final Project sortProject = partialQuery.getSortProject();
 
-    final Integer limit = sort.fetch != null ? RexLiteral.intValue(sort.fetch) : null;
+    // Extract limit.
+    final Long limit = sort.fetch != null ? ((Number) RexLiteral.value(sort.fetch)).longValue() : null;
     final List<OrderByColumnSpec> orderBys = new ArrayList<>(sort.getChildExps().size());
 
     if (sort.offset != null) {
-      // LimitSpecs don't accept offsets.
+      // Druid cannot currently handle LIMIT with OFFSET.
       throw new CannotBuildQueryException(sort);
     }
 
@@ -610,7 +500,7 @@ public class DruidQuery
 
       if (sortExpression.isA(SqlKind.INPUT_REF)) {
         final RexInputRef ref = (RexInputRef) sortExpression;
-        final String fieldName = outputRowSignature.getRowOrder().get(ref.getIndex());
+        final String fieldName = rowSignature.getRowOrder().get(ref.getIndex());
         orderBys.add(new OrderByColumnSpec(fieldName, direction, comparator));
       } else {
         // We don't support sorting by anything other than refs which actually appear in the query result.
@@ -618,42 +508,25 @@ public class DruidQuery
       }
     }
 
-    return new DefaultLimitSpec(orderBys, limit);
-  }
-
-  /**
-   * Returns true if a post-aggregation "expression" can be realized as a direct field access. This is true if it's
-   * a direct column access that doesn't require an implicit cast.
-   *
-   * @param aggregateRowSignature signature of the aggregation
-   * @param expression            post-aggregation expression
-   * @param rexNode               RexNode for the post-aggregation expression
-   *
-   * @return yes or no
-   */
-  private static boolean postAggregatorDirectColumnIsOk(
-      final RowSignature aggregateRowSignature,
-      final DruidExpression expression,
-      final RexNode rexNode
-  )
-  {
-    if (!expression.isDirectColumnAccess()) {
-      return false;
-    }
+    // Extract any post-sort Projection.
+    final Projection projection;
 
-    // Check if a cast is necessary.
-    final ExprType toExprType = Expressions.exprTypeForValueType(
-        aggregateRowSignature.getColumnType(expression.getDirectColumn())
-    );
+    if (sortProject == null) {
+      projection = null;
+    } else if (partialQuery.getAggregate() == null) {
+      if (virtualColumnRegistry == null) {
+        throw new ISE("Must provide 'virtualColumnRegistry' for pre-aggregation Projection!");
+      }
 
-    final ExprType fromExprType = Expressions.exprTypeForValueType(
-        Calcites.getValueTypeForSqlTypeName(rexNode.getType().getSqlTypeName())
-    );
+      projection = Projection.preAggregation(sortProject, plannerContext, rowSignature, virtualColumnRegistry);
+    } else {
+      projection = Projection.postAggregation(sortProject, plannerContext, rowSignature, "s");
+    }
 
-    return toExprType.equals(fromExprType);
+    return Sorting.create(orderBys, limit, projection);
   }
 
-  public VirtualColumns getVirtualColumns(final boolean includeDimensions)
+  private VirtualColumns getVirtualColumns(final boolean includeDimensions)
   {
     // 'sourceRowSignature' could provide a list of all defined virtual columns while constructing a query, but we
     // still want to collect the set of VirtualColumns this way to ensure we only add what is still being used after
@@ -663,50 +536,47 @@ public class DruidQuery
     // we always want to add any virtual columns used by the query level DimFilter
     if (filter != null) {
       for (String columnName : filter.getRequiredColumns()) {
-        if (sourceQuerySignature.isVirtualColumnDefined(columnName)) {
-          virtualColumns.add(sourceQuerySignature.getVirtualColumn(columnName));
+        if (virtualColumnRegistry.isVirtualColumnDefined(columnName)) {
+          virtualColumns.add(virtualColumnRegistry.getVirtualColumn(columnName));
         }
       }
     }
+
     if (selectProjection != null) {
       virtualColumns.addAll(selectProjection.getVirtualColumns());
-    } else {
-      if (grouping != null) {
-        if (includeDimensions) {
-          for (DimensionExpression expression : grouping.getDimensions()) {
-            if (sourceQuerySignature.isVirtualColumnDefined(expression.getOutputName())) {
-              virtualColumns.add(sourceQuerySignature.getVirtualColumn(expression.getOutputName()));
-            }
+    }
+
+    if (grouping != null) {
+      if (includeDimensions) {
+        for (DimensionExpression expression : grouping.getDimensions()) {
+          if (virtualColumnRegistry.isVirtualColumnDefined(expression.getOutputName())) {
+            virtualColumns.add(virtualColumnRegistry.getVirtualColumn(expression.getOutputName()));
           }
         }
+      }
 
-        for (Aggregation aggregation : grouping.getAggregations()) {
-          virtualColumns.addAll(aggregation.getVirtualColumns());
-        }
+      for (Aggregation aggregation : grouping.getAggregations()) {
+        virtualColumns.addAll(aggregation.getVirtualColumns());
       }
     }
 
+    if (sorting != null && sorting.getProjection() != null && grouping == null) {
+      // Sorting without grouping means we might have some post-sort Projection virtual columns.
+      virtualColumns.addAll(sorting.getProjection().getVirtualColumns());
+    }
+
     // sort for predictable output
     List<VirtualColumn> columns = new ArrayList<>(virtualColumns);
     columns.sort(Comparator.comparing(VirtualColumn::getOutputName));
     return VirtualColumns.create(columns);
   }
 
+  @Nullable
   public Grouping getGrouping()
   {
     return grouping;
   }
 
-  public DefaultLimitSpec getLimitSpec()
-  {
-    return limitSpec;
-  }
-
-  public SortProject getSortProject()
-  {
-    return sortProject;
-  }
-
   public RelDataType getOutputRowType()
   {
     return outputRowType;
@@ -723,6 +593,26 @@ public class DruidQuery
   }
 
   /**
+   * Return the {@link RowSignature} corresponding to the output of this query. This method may be called during
+   * construction, in which case it returns the output row signature at whatever phase of construction this method
+   * is called at. At the end of construction, the final result is assigned to {@link #outputRowSignature}.
+   */
+  private RowSignature computeOutputRowSignature()
+  {
+    if (sorting != null && sorting.getProjection() != null) {
+      return sorting.getProjection().getOutputRowSignature();
+    } else if (grouping != null) {
+      // Sanity check: cannot have both "grouping" and "selectProjection".
+      Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
+      return grouping.getOutputRowSignature();
+    } else if (selectProjection != null) {
+      return selectProjection.getOutputRowSignature();
+    } else {
+      return sourceRowSignature;
+    }
+  }
+
+  /**
    * Return this query as some kind of Druid query. The returned query will either be {@link TopNQuery},
    * {@link TimeseriesQuery}, {@link GroupByQuery}, {@link ScanQuery}, or {@link SelectQuery}.
    *
@@ -794,26 +684,24 @@ public class DruidQuery
         // Timeseries only applies if the single dimension is granular __time.
         return null;
       }
-      if (limitSpec != null) {
-        // If there is a limit spec, set timeseriesLimit to given value if less than Integer.Max_VALUE
-        if (limitSpec.isLimited()) {
-          timeseriesLimit = limitSpec.getLimit();
+
+      if (sorting != null) {
+        // If there is sorting, set timeseriesLimit to given value if less than Integer.Max_VALUE
+        if (sorting.isLimited()) {
+          timeseriesLimit = Ints.checkedCast(sorting.getLimit());
         }
 
-        if (limitSpec.getColumns().isEmpty()) {
-          descending = false;
-        } else {
-          // We're ok if the first order by is time (since every time value is distinct, the rest of the columns
-          // wouldn't matter anyway).
-          final OrderByColumnSpec firstOrderBy = limitSpec.getColumns().get(0);
-
-          if (firstOrderBy.getDimension().equals(dimensionExpression.getOutputName())) {
-            // Order by time.
-            descending = firstOrderBy.getDirection() == OrderByColumnSpec.Direction.DESCENDING;
-          } else {
-            // Order by something else.
+        switch (sorting.getSortKind(dimensionExpression.getOutputName())) {
+          case UNORDERED:
+          case TIME_ASCENDING:
+            descending = false;
+            break;
+          case TIME_DESCENDING:
+            descending = true;
+            break;
+          default:
+            // Sorting on a metric, maybe. Timeseries cannot handle.
             return null;
-          }
         }
       } else {
         // No limitSpec.
@@ -824,11 +712,11 @@ public class DruidQuery
       return null;
     }
 
-    final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
+    final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
 
     final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
-    if (sortProject != null) {
-      postAggregators.addAll(sortProject.getPostAggregators());
+    if (sorting != null && sorting.getProjection() != null) {
+      postAggregators.addAll(sorting.getProjection().getPostAggregators());
     }
     final Map<String, Object> theContext = new HashMap<>();
     theContext.put("skipEmptyBuckets", true);
@@ -859,9 +747,10 @@ public class DruidQuery
     // Must have GROUP BY one column, ORDER BY zero or one column, limit less than maxTopNLimit, and no HAVING.
     final boolean topNOk = grouping != null
                            && grouping.getDimensions().size() == 1
-                           && limitSpec != null
-                           && (limitSpec.getColumns().size() <= 1
-                               && limitSpec.getLimit() <= plannerContext.getPlannerConfig().getMaxTopNLimit())
+                           && sorting != null
+                           && (sorting.getOrderBys().size() <= 1
+                               && sorting.isLimited() && sorting.getLimit() <= plannerContext.getPlannerConfig()
+                                                                                             .getMaxTopNLimit())
                            && grouping.getHavingFilter() == null;
 
     if (!topNOk) {
@@ -870,14 +759,14 @@ public class DruidQuery
 
     final DimensionSpec dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec();
     final OrderByColumnSpec limitColumn;
-    if (limitSpec.getColumns().isEmpty()) {
+    if (sorting.getOrderBys().isEmpty()) {
       limitColumn = new OrderByColumnSpec(
           dimensionSpec.getOutputName(),
           OrderByColumnSpec.Direction.ASCENDING,
           Calcites.getStringComparatorForValueType(dimensionSpec.getOutputType())
       );
     } else {
-      limitColumn = Iterables.getOnlyElement(limitSpec.getColumns());
+      limitColumn = Iterables.getOnlyElement(sorting.getOrderBys());
     }
     final TopNMetricSpec topNMetricSpec;
 
@@ -900,11 +789,11 @@ public class DruidQuery
       return null;
     }
 
-    final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
+    final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
 
     final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
-    if (sortProject != null) {
-      postAggregators.addAll(sortProject.getPostAggregators());
+    if (sorting.getProjection() != null) {
+      postAggregators.addAll(sorting.getProjection().getPostAggregators());
     }
 
     return new TopNQuery(
@@ -912,7 +801,7 @@ public class DruidQuery
         getVirtualColumns(true),
         dimensionSpec,
         topNMetricSpec,
-        limitSpec.getLimit(),
+        Ints.checkedCast(sorting.getLimit()),
         filtration.getQuerySegmentSpec(),
         filtration.getDimFilter(),
         Granularities.ALL,
@@ -934,20 +823,22 @@ public class DruidQuery
       return null;
     }
 
-    final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
+    final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
 
     final DimFilterHavingSpec havingSpec;
     if (grouping.getHavingFilter() != null) {
       havingSpec = new DimFilterHavingSpec(
-          Filtration.create(grouping.getHavingFilter()).optimizeFilterOnly(sourceQuerySignature).getDimFilter(),
+          Filtration.create(grouping.getHavingFilter())
+                    .optimizeFilterOnly(grouping.getOutputRowSignature())
+                    .getDimFilter(),
           true
       );
     } else {
       havingSpec = null;
     }
     final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
-    if (sortProject != null) {
-      postAggregators.addAll(sortProject.getPostAggregators());
+    if (sorting != null && sorting.getProjection() != null) {
+      postAggregators.addAll(sorting.getProjection().getPostAggregators());
     }
 
     return new GroupByQuery(
@@ -960,7 +851,9 @@ public class DruidQuery
         grouping.getAggregatorFactories(),
         postAggregators,
         havingSpec,
-        limitSpec,
+        sorting != null
+        ? new DefaultLimitSpec(sorting.getOrderBys(), sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null)
+        : NoopLimitSpec.instance(),
         null,
         ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
     );
@@ -978,34 +871,44 @@ public class DruidQuery
       // Scan cannot GROUP BY.
       return null;
     }
-    if (limitSpec != null &&
-        (limitSpec.getColumns().size() > 1
-         || (limitSpec.getColumns().size() == 1 && !Iterables.getOnlyElement(limitSpec.getColumns())
-                                                             .getDimension()
-                                                             .equals(ColumnHolder.TIME_COLUMN_NAME)))) {
-      // Scan cannot ORDER BY non-time columns.
-      return null;
-    }
+
 
     if (outputRowSignature.getRowOrder().isEmpty()) {
       // Should never do a scan query without any columns that we're interested in. This is probably a planner bug.
       throw new ISE("WTF?! Attempting to convert to Scan query without any columns?");
     }
 
-    final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
+    final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
+    final ScanQuery.Order order;
+    long scanLimit = 0L;
 
-    // DefaultLimitSpec (which we use to "remember" limits) is int typed, and Integer.MAX_VALUE means "no limit".
-    final long scanLimit = limitSpec == null || limitSpec.getLimit() == Integer.MAX_VALUE
-                           ? 0L
-                           : (long) limitSpec.getLimit();
+    if (sorting != null) {
+      if (sorting.isLimited()) {
+        scanLimit = sorting.getLimit();
+      }
 
-    ScanQuery.Order order;
-    if (limitSpec == null || limitSpec.getColumns().size() == 0) {
-      order = ScanQuery.Order.NONE;
-    } else if (limitSpec.getColumns().get(0).getDirection() == OrderByColumnSpec.Direction.ASCENDING) {
-      order = ScanQuery.Order.ASCENDING;
+      final Sorting.SortKind sortKind = sorting.getSortKind(ColumnHolder.TIME_COLUMN_NAME);
+
+      if (sortKind == Sorting.SortKind.UNORDERED) {
+        order = ScanQuery.Order.NONE;
+      } else if (sortKind == Sorting.SortKind.TIME_ASCENDING) {
+        order = ScanQuery.Order.ASCENDING;
+      } else if (sortKind == Sorting.SortKind.TIME_DESCENDING) {
+        order = ScanQuery.Order.DESCENDING;
+      } else {
+        assert sortKind == Sorting.SortKind.NON_TIME;
+
+        // Scan cannot ORDER BY non-time columns.
+        return null;
+      }
     } else {
-      order = ScanQuery.Order.DESCENDING;
+      order = ScanQuery.Order.NONE;
+    }
+
+    // Compute the list of columns to select.
+    final Set<String> columns = new HashSet<>(outputRowSignature.getRowOrder());
+    if (order != ScanQuery.Order.NONE) {
+      columns.add(ColumnHolder.TIME_COLUMN_NAME);
     }
 
     return new ScanQuery(
@@ -1015,9 +918,9 @@ public class DruidQuery
         ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
         0,
         scanLimit,
-        order, // Will default to "none"
+        order,
         filtration.getDimFilter(),
-        Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())),
+        Ordering.natural().sortedCopy(columns),
         false,
         ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
     );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java
index f60fbb8..6248a36 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java
@@ -344,8 +344,7 @@ public class DruidSemiJoin extends DruidRel<DruidSemiJoin>
 
       PartialDruidQuery newPartialQuery = PartialDruidQuery.create(leftPartialQuery.getScan())
                                                            .withWhereFilter(newWhereFilter)
-                                                           .withSelectProject(leftPartialQuery.getSelectProject())
-                                                           .withSelectSort(leftPartialQuery.getSelectSort());
+                                                           .withSelectProject(leftPartialQuery.getSelectProject());
 
       if (leftPartialQuery.getAggregate() != null) {
         newPartialQuery = newPartialQuery.withAggregate(leftPartialQuery.getAggregate());
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java
index 5a8dd38..8ce4cd8 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java
@@ -36,6 +36,15 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+/**
+ * Represents Druid's concept of a "group by": dimensions, aggregations, post-aggregations, and 'having' filters. This
+ * is always something that can be handled by a groupBy query, and in some cases, it may be handleable by a timeseries
+ * or topN query type as well.
+ *
+ * This corresponds to a Calcite Aggregate + optional Filter + optional Project.
+ *
+ * It does not include sorting, limiting, or post-sorting projections: for this, see the {@link Sorting} class.
+ */
 public class Grouping
 {
   private final List<DimensionExpression> dimensions;
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
index a29bd64..e9473c6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -51,7 +51,6 @@ public class PartialDruidQuery
   private final RelNode scan;
   private final Filter whereFilter;
   private final Project selectProject;
-  private final Sort selectSort;
   private final Aggregate aggregate;
   private final Filter havingFilter;
   private final Project aggregateProject;
@@ -60,13 +59,19 @@ public class PartialDruidQuery
 
   public enum Stage
   {
+    // SCAN must be present on all queries.
     SCAN,
+
+    // WHERE_FILTER, SELECT_PROJECT may be present on any query.
     WHERE_FILTER,
     SELECT_PROJECT,
-    SELECT_SORT,
+
+    // AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on aggregating queries.
     AGGREGATE,
     HAVING_FILTER,
     AGGREGATE_PROJECT,
+
+    // SORT, SORT_PROJECT may be present on any query.
     SORT,
     SORT_PROJECT
   }
@@ -76,7 +81,6 @@ public class PartialDruidQuery
       final RelNode scan,
       final Filter whereFilter,
       final Project selectProject,
-      final Sort selectSort,
       final Aggregate aggregate,
       final Project aggregateProject,
       final Filter havingFilter,
@@ -88,7 +92,6 @@ public class PartialDruidQuery
     this.scan = Preconditions.checkNotNull(scan, "scan");
     this.whereFilter = whereFilter;
     this.selectProject = selectProject;
-    this.selectSort = selectSort;
     this.aggregate = aggregate;
     this.aggregateProject = aggregateProject;
     this.havingFilter = havingFilter;
@@ -102,7 +105,7 @@ public class PartialDruidQuery
         scanRel.getCluster(),
         scanRel.getTable().getRelOptSchema()
     );
-    return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null, null);
+    return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null);
   }
 
   public RelNode getScan()
@@ -120,11 +123,6 @@ public class PartialDruidQuery
     return selectProject;
   }
 
-  public Sort getSelectSort()
-  {
-    return selectSort;
-  }
-
   public Aggregate getAggregate()
   {
     return aggregate;
@@ -158,7 +156,6 @@ public class PartialDruidQuery
         scan,
         newWhereFilter,
         selectProject,
-        selectSort,
         aggregate,
         aggregateProject,
         havingFilter,
@@ -200,24 +197,6 @@ public class PartialDruidQuery
         scan,
         whereFilter,
         theProject,
-        selectSort,
-        aggregate,
-        aggregateProject,
-        havingFilter,
-        sort,
-        sortProject
-    );
-  }
-
-  public PartialDruidQuery withSelectSort(final Sort newSelectSort)
-  {
-    validateStage(Stage.SELECT_SORT);
-    return new PartialDruidQuery(
-        builderSupplier,
-        scan,
-        whereFilter,
-        selectProject,
-        newSelectSort,
         aggregate,
         aggregateProject,
         havingFilter,
@@ -234,7 +213,6 @@ public class PartialDruidQuery
         scan,
         whereFilter,
         selectProject,
-        selectSort,
         newAggregate,
         aggregateProject,
         havingFilter,
@@ -251,7 +229,6 @@ public class PartialDruidQuery
         scan,
         whereFilter,
         selectProject,
-        selectSort,
         aggregate,
         aggregateProject,
         newHavingFilter,
@@ -268,7 +245,6 @@ public class PartialDruidQuery
         scan,
         whereFilter,
         selectProject,
-        selectSort,
         aggregate,
         newAggregateProject,
         havingFilter,
@@ -285,7 +261,6 @@ public class PartialDruidQuery
         scan,
         whereFilter,
         selectProject,
-        selectSort,
         aggregate,
         aggregateProject,
         havingFilter,
@@ -302,7 +277,6 @@ public class PartialDruidQuery
         scan,
         whereFilter,
         selectProject,
-        selectSort,
         aggregate,
         aggregateProject,
         havingFilter,
@@ -344,14 +318,11 @@ public class PartialDruidQuery
     } else if (stage.compareTo(currentStage) <= 0) {
       // Cannot go backwards.
       return false;
-    } else if (stage.compareTo(Stage.AGGREGATE) > 0 && aggregate == null) {
+    } else if (stage.compareTo(Stage.AGGREGATE) > 0 && stage.compareTo(Stage.SORT) < 0 && aggregate == null) {
       // Cannot do post-aggregation stages without an aggregation.
       return false;
-    } else if (stage.compareTo(Stage.AGGREGATE) >= 0 && selectSort != null) {
-      // Cannot do any aggregations after a select + sort.
-      return false;
     } else if (stage.compareTo(Stage.SORT) > 0 && sort == null) {
-      // Cannot add sort project without a sort
+      // Cannot do post-sort stages without a sort.
       return false;
     } else {
       // Looks good.
@@ -378,8 +349,6 @@ public class PartialDruidQuery
       return Stage.HAVING_FILTER;
     } else if (aggregate != null) {
       return Stage.AGGREGATE;
-    } else if (selectSort != null) {
-      return Stage.SELECT_SORT;
     } else if (selectProject != null) {
       return Stage.SELECT_PROJECT;
     } else if (whereFilter != null) {
@@ -409,8 +378,6 @@ public class PartialDruidQuery
         return havingFilter;
       case AGGREGATE:
         return aggregate;
-      case SELECT_SORT:
-        return selectSort;
       case SELECT_PROJECT:
         return selectProject;
       case WHERE_FILTER:
@@ -442,7 +409,6 @@ public class PartialDruidQuery
     return Objects.equals(scan, that.scan) &&
            Objects.equals(whereFilter, that.whereFilter) &&
            Objects.equals(selectProject, that.selectProject) &&
-           Objects.equals(selectSort, that.selectSort) &&
            Objects.equals(aggregate, that.aggregate) &&
            Objects.equals(havingFilter, that.havingFilter) &&
            Objects.equals(aggregateProject, that.aggregateProject) &&
@@ -457,7 +423,6 @@ public class PartialDruidQuery
         scan,
         whereFilter,
         selectProject,
-        selectSort,
         aggregate,
         havingFilter,
         aggregateProject,
@@ -473,7 +438,6 @@ public class PartialDruidQuery
            "scan=" + scan +
            ", whereFilter=" + whereFilter +
            ", selectProject=" + selectProject +
-           ", selectSort=" + selectSort +
            ", aggregate=" + aggregate +
            ", havingFilter=" + havingFilter +
            ", aggregateProject=" + aggregateProject +
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
new file mode 100644
index 0000000..5dcd90c
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexNode;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.table.RowSignature;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Used to represent projections (Calcite "Project"). These are embedded in {@link Sorting} and {@link Grouping} to
+ * store post-sorting and post-grouping projections, as well as directly in {@link DruidQuery} to store potential
+ * post-selection projections. They may be built using either virtual columns (pre-aggregation) or post-aggregators.
+ *
+ * It is expected that callers will create and use Projection instances in the same context (pre- or post-aggregation).
+ * If this isn't done properly (i.e. a caller creates a pre-aggregation Projection but then calls
+ * {@link #getPostAggregators()} then an exception will be thrown.
+ */
+public class Projection
+{
+  @Nullable
+  private final List<PostAggregator> postAggregators;
+
+  @Nullable
+  private final List<VirtualColumn> virtualColumns;
+
+  private final RowSignature outputRowSignature;
+
+  private Projection(
+      @Nullable final List<PostAggregator> postAggregators,
+      @Nullable final List<VirtualColumn> virtualColumns,
+      final RowSignature outputRowSignature
+  )
+  {
+    if (postAggregators == null && virtualColumns == null) {
+      throw new IAE("postAggregators and virtualColumns cannot both be null");
+    } else if (postAggregators != null && virtualColumns != null) {
+      throw new IAE("postAggregators and virtualColumns cannot both be nonnull");
+    }
+
+    this.postAggregators = postAggregators;
+    this.virtualColumns = virtualColumns;
+    this.outputRowSignature = outputRowSignature;
+  }
+
+  public static Projection postAggregation(
+      final Project project,
+      final PlannerContext plannerContext,
+      final RowSignature inputRowSignature,
+      final String basePrefix
+  )
+  {
+    final List<String> rowOrder = new ArrayList<>();
+    final List<PostAggregator> postAggregators = new ArrayList<>();
+    final String outputNamePrefix = Calcites.findUnusedPrefix(
+        basePrefix,
+        new TreeSet<>(inputRowSignature.getRowOrder())
+    );
+
+    int outputNameCounter = 0;
+    for (final RexNode postAggregatorRexNode : project.getChildExps()) {
+      // Attempt to convert to PostAggregator.
+      final DruidExpression postAggregatorExpression = Expressions.toDruidExpression(
+          plannerContext,
+          inputRowSignature,
+          postAggregatorRexNode
+      );
+
+      if (postAggregatorExpression == null) {
+        throw new CannotBuildQueryException(project, postAggregatorRexNode);
+      }
+
+      if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
+        // Direct column access, without any type cast as far as Druid's runtime is concerned.
+        // (There might be a SQL-level type cast that we don't care about)
+        rowOrder.add(postAggregatorExpression.getDirectColumn());
+      } else {
+        final String postAggregatorName = outputNamePrefix + outputNameCounter++;
+        final PostAggregator postAggregator = new ExpressionPostAggregator(
+            postAggregatorName,
+            postAggregatorExpression.getExpression(),
+            null,
+            plannerContext.getExprMacroTable()
+        );
+        postAggregators.add(postAggregator);
+        rowOrder.add(postAggregator.getName());
+      }
+    }
+
+    return new Projection(postAggregators, null, RowSignature.from(rowOrder, project.getRowType()));
+  }
+
+  public static Projection preAggregation(
+      final Project project,
+      final PlannerContext plannerContext,
+      final RowSignature inputRowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry
+  )
+  {
+    final List<DruidExpression> expressions = new ArrayList<>();
+
+    for (final RexNode rexNode : project.getChildExps()) {
+      final DruidExpression expression = Expressions.toDruidExpression(
+          plannerContext,
+          inputRowSignature,
+          rexNode
+      );
+
+      if (expression == null) {
+        throw new CannotBuildQueryException(project, rexNode);
+      } else {
+        expressions.add(expression);
+      }
+    }
+
+    final Set<VirtualColumn> virtualColumns = new HashSet<>();
+    final List<String> rowOrder = new ArrayList<>();
+
+    for (int i = 0; i < expressions.size(); i++) {
+      final DruidExpression expression = expressions.get(i);
+      if (expression.isDirectColumnAccess()) {
+        rowOrder.add(expression.getDirectColumn());
+      } else {
+        final VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
+            plannerContext,
+            expression,
+            project.getChildExps().get(i).getType().getSqlTypeName()
+        );
+        virtualColumns.add(virtualColumn);
+        rowOrder.add(virtualColumn.getOutputName());
+      }
+    }
+
+    return new Projection(
+        null,
+        ImmutableList.copyOf(virtualColumns),
+        RowSignature.from(rowOrder, project.getRowType())
+    );
+  }
+
+  /**
+   * Returns true if a post-aggregation "expression" can be realized as a direct field access. This is true if it's
+   * a direct column access that doesn't require an implicit cast.
+   *
+   * @param aggregateRowSignature signature of the aggregation
+   * @param expression            post-aggregation expression
+   * @param rexNode               RexNode for the post-aggregation expression
+   *
+   * @return yes or no
+   */
+  private static boolean postAggregatorDirectColumnIsOk(
+      final RowSignature aggregateRowSignature,
+      final DruidExpression expression,
+      final RexNode rexNode
+  )
+  {
+    if (!expression.isDirectColumnAccess()) {
+      return false;
+    }
+
+    // Check if a cast is necessary.
+    final ExprType toExprType = Expressions.exprTypeForValueType(
+        aggregateRowSignature.getColumnType(expression.getDirectColumn())
+    );
+
+    final ExprType fromExprType = Expressions.exprTypeForValueType(
+        Calcites.getValueTypeForSqlTypeName(rexNode.getType().getSqlTypeName())
+    );
+
+    return toExprType.equals(fromExprType);
+  }
+
+  public List<PostAggregator> getPostAggregators()
+  {
+    // If you ever see this error, it probably means a Projection was created in pre-aggregation mode, but then
+    // used in a post-aggregation context. This is likely a bug somewhere in DruidQuery. See class-level Javadocs.
+    return Preconditions.checkNotNull(postAggregators, "postAggregators");
+  }
+
+  public List<VirtualColumn> getVirtualColumns()
+  {
+    // If you ever see this error, it probably means a Projection was created in post-aggregation mode, but then
+    // used in a pre-aggregation context. This is likely a bug somewhere in DruidQuery. See class-level Javadocs.
+    return Preconditions.checkNotNull(virtualColumns, "virtualColumns");
+  }
+
+  public RowSignature getOutputRowSignature()
+  {
+    return outputRowSignature;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Projection that = (Projection) o;
+    return Objects.equals(postAggregators, that.postAggregators) &&
+           Objects.equals(virtualColumns, that.virtualColumns) &&
+           Objects.equals(outputRowSignature, that.outputRowSignature);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(postAggregators, virtualColumns, outputRowSignature);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "PostSortingExpressions{" +
+           "postAggregators=" + postAggregators +
+           ", virtualColumns=" + virtualColumns +
+           ", outputRowSignature=" + outputRowSignature +
+           '}';
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/SelectProjection.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/SelectProjection.java
deleted file mode 100644
index 6175540..0000000
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/SelectProjection.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.calcite.rel;
-
-import org.apache.druid.segment.VirtualColumn;
-import org.apache.druid.sql.calcite.table.RowSignature;
-
-import java.util.List;
-import java.util.Objects;
-
-public class SelectProjection
-{
-  private final List<String> directColumns;
-  private final List<VirtualColumn> virtualColumns;
-  private final RowSignature outputRowSignature;
-
-  public SelectProjection(
-      final List<String> directColumns,
-      final List<VirtualColumn> virtualColumns,
-      final RowSignature outputRowSignature
-  )
-  {
-    this.directColumns = directColumns;
-    this.virtualColumns = virtualColumns;
-    this.outputRowSignature = outputRowSignature;
-  }
-
-  public List<String> getDirectColumns()
-  {
-    return directColumns;
-  }
-
-  public List<VirtualColumn> getVirtualColumns()
-  {
-    return virtualColumns;
-  }
-
-  public RowSignature getOutputRowSignature()
-  {
-    return outputRowSignature;
-  }
-
-  @Override
-  public boolean equals(final Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    final SelectProjection that = (SelectProjection) o;
-    return Objects.equals(directColumns, that.directColumns) &&
-           Objects.equals(virtualColumns, that.virtualColumns) &&
-           Objects.equals(outputRowSignature, that.outputRowSignature);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(directColumns, virtualColumns, outputRowSignature);
-  }
-
-  @Override
-  public String toString()
-  {
-    return "SelectProjection{" +
-           "directColumns=" + directColumns +
-           ", virtualColumns=" + virtualColumns +
-           ", outputRowSignature=" + outputRowSignature +
-           '}';
-  }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/SortProject.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/SortProject.java
deleted file mode 100644
index f7c503b..0000000
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/SortProject.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.calcite.rel;
-
-import com.google.common.base.Preconditions;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.sql.calcite.table.RowSignature;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class SortProject
-{
-  private final RowSignature inputRowSignature;
-  private final List<PostAggregator> postAggregators;
-  private final RowSignature outputRowSignature;
-
-  SortProject(
-      RowSignature inputRowSignature,
-      List<PostAggregator> postAggregators,
-      RowSignature outputRowSignature
-  )
-  {
-    this.inputRowSignature = Preconditions.checkNotNull(inputRowSignature, "inputRowSignature");
-    this.postAggregators = Preconditions.checkNotNull(postAggregators, "postAggregators");
-    this.outputRowSignature = Preconditions.checkNotNull(outputRowSignature, "outputRowSignature");
-
-    final Set<String> inputColumnNames = new HashSet<>(inputRowSignature.getRowOrder());
-    final Set<String> postAggregatorNames = postAggregators.stream()
-                                                           .map(PostAggregator::getName)
-                                                           .collect(Collectors.toSet());
-
-    // Verify no collisions between inputs and outputs.
-    for (String postAggregatorName : postAggregatorNames) {
-      if (inputColumnNames.contains(postAggregatorName)) {
-        throw new ISE("Duplicate field name: %s", postAggregatorName);
-      }
-    }
-
-    // Verify that items in the output signature exist.
-    outputRowSignature.getRowOrder().forEach(field -> {
-      if (!inputColumnNames.contains(field) && !postAggregatorNames.contains(field)) {
-        throw new ISE("Missing field in rowOrder: %s", field);
-      }
-    });
-  }
-
-  public List<PostAggregator> getPostAggregators()
-  {
-    return postAggregators;
-  }
-
-  public RowSignature getOutputRowSignature()
-  {
-    return outputRowSignature;
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    SortProject sortProject = (SortProject) o;
-    return Objects.equals(inputRowSignature, sortProject.inputRowSignature) &&
-           Objects.equals(postAggregators, sortProject.postAggregators) &&
-           Objects.equals(outputRowSignature, sortProject.outputRowSignature);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(inputRowSignature, postAggregators, outputRowSignature);
-  }
-
-  @Override
-  public String toString()
-  {
-    return "SortProject{" +
-           "inputRowSignature=" + inputRowSignature +
-           ", postAggregators=" + postAggregators +
-           ", outputRowSignature=" + outputRowSignature +
-           '}';
-  }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java
new file mode 100644
index 0000000..4b939e6
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents Druid's concept of sorting and limiting, including post-sort projections. The sorting and limiting piece
+ * may map onto multiple Druid concepts: LimitSpec (for groupBy), TopNMetricSpec and threshold (for topN),
+ * "descending" (for timeseries), or ScanQuery.Order (for scan). The post-sort projections will map onto either
+ * post-aggregations (for query types that aggregate) or virtual columns (for query types that don't).
+ *
+ * This corresponds to a Calcite Sort + optional Project.
+ */
+public class Sorting
+{
+  enum SortKind
+  {
+    UNORDERED,
+    TIME_ASCENDING,
+    TIME_DESCENDING,
+    NON_TIME
+  }
+
+  private final List<OrderByColumnSpec> orderBys;
+
+  @Nullable
+  private final Projection projection;
+
+  @Nullable
+  private final Long limit;
+
+  private Sorting(
+      final List<OrderByColumnSpec> orderBys,
+      @Nullable final Long limit,
+      @Nullable final Projection projection
+  )
+  {
+    this.orderBys = Preconditions.checkNotNull(orderBys, "orderBys");
+    this.limit = limit;
+    this.projection = projection;
+  }
+
+  public static Sorting create(
+      final List<OrderByColumnSpec> orderBys,
+      @Nullable final Long limit,
+      @Nullable final Projection projection
+  )
+  {
+    return new Sorting(orderBys, limit, projection);
+  }
+
+  public SortKind getSortKind(final String timeColumn)
+  {
+    if (orderBys.isEmpty()) {
+      return SortKind.UNORDERED;
+    } else {
+      if (orderBys.size() == 1) {
+        final OrderByColumnSpec orderBy = Iterables.getOnlyElement(orderBys);
+        if (orderBy.getDimension().equals(timeColumn)) {
+          return orderBy.getDirection() == OrderByColumnSpec.Direction.ASCENDING
+                 ? SortKind.TIME_ASCENDING
+                 : SortKind.TIME_DESCENDING;
+        }
+      }
+
+      return SortKind.NON_TIME;
+    }
+  }
+
+  public List<OrderByColumnSpec> getOrderBys()
+  {
+    return orderBys;
+  }
+
+  @Nullable
+  public Projection getProjection()
+  {
+    return projection;
+  }
+
+  public boolean isLimited()
+  {
+    return limit != null;
+  }
+
+  @Nullable
+  public Long getLimit()
+  {
+    return limit;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Sorting sorting = (Sorting) o;
+    return Objects.equals(orderBys, sorting.orderBys) &&
+           Objects.equals(projection, sorting.projection) &&
+           Objects.equals(limit, sorting.limit);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(orderBys, projection, limit);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "Sorting{" +
+           "orderBys=" + orderBys +
+           ", projection=" + projection +
+           ", limit=" + limit +
+           '}';
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuerySignature.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
similarity index 59%
rename from sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuerySignature.java
rename to sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
index 2b320ec..dc35391 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuerySignature.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
@@ -32,52 +32,38 @@ import java.util.Map;
 import java.util.TreeSet;
 
 /**
- * Wraps a {@link RowSignature} and provides facilities to re-use {@link VirtualColumn} definitions for dimensions,
- * filters, and filtered aggregators while constructing a {@link DruidQuery}
+ * Provides facilities to create and re-use {@link VirtualColumn} definitions for dimensions, filters, and filtered
+ * aggregators while constructing a {@link DruidQuery}.
  */
-public class DruidQuerySignature
+public class VirtualColumnRegistry
 {
-  private final RowSignature rowSignature;
-  private final boolean isAggregateSignature;
-
+  private final RowSignature baseRowSignature;
   private final Map<String, VirtualColumn> virtualColumnsByExpression;
   private final Map<String, VirtualColumn> virtualColumnsByName;
   private final String virtualColumnPrefix;
   private int virtualColumnCounter;
 
-  public DruidQuerySignature(RowSignature rowSignature)
-  {
-    this.isAggregateSignature = false;
-    this.rowSignature = rowSignature;
-    this.virtualColumnPrefix = rowSignature == null ? "v" : Calcites.findUnusedPrefix(
-        "v",
-        new TreeSet<>(rowSignature.getRowOrder())
-    );
-    this.virtualColumnsByExpression = new HashMap<>();
-    this.virtualColumnsByName = new HashMap<>();
-  }
-
-  private DruidQuerySignature(
-      RowSignature rowSignature,
-      String prefix,
+  private VirtualColumnRegistry(
+      RowSignature baseRowSignature,
+      String virtualColumnPrefix,
       Map<String, VirtualColumn> virtualColumnsByExpression,
-      Map<String, VirtualColumn> virtualColumnsByName,
-      boolean isAggregateSignature
+      Map<String, VirtualColumn> virtualColumnsByName
   )
   {
-    this.isAggregateSignature = isAggregateSignature;
-    this.rowSignature = rowSignature;
-    this.virtualColumnPrefix = prefix;
+    this.baseRowSignature = baseRowSignature;
+    this.virtualColumnPrefix = virtualColumnPrefix;
     this.virtualColumnsByExpression = virtualColumnsByExpression;
     this.virtualColumnsByName = virtualColumnsByName;
   }
 
-  /**
-   * Get {@link RowSignature} of {@link DruidQuery} under construction
-   */
-  public RowSignature getRowSignature()
+  public static VirtualColumnRegistry create(final RowSignature rowSignature)
   {
-    return rowSignature;
+    return new VirtualColumnRegistry(
+        rowSignature,
+        Calcites.findUnusedPrefix("v", new TreeSet<>(rowSignature.getRowOrder())),
+        new HashMap<>(),
+        new HashMap<>()
+    );
   }
 
   /**
@@ -88,19 +74,16 @@ public class DruidQuerySignature
     return virtualColumnsByName.containsKey(virtualColumnName);
   }
 
-
   /**
-   * Get existing or create new (if not {@link DruidQuerySignature#isAggregateSignature}) {@link VirtualColumn} for a given
-   * {@link DruidExpression}
+   * Get existing or create new {@link VirtualColumn} for a given {@link DruidExpression}.
    */
-  @Nullable
   public VirtualColumn getOrCreateVirtualColumnForExpression(
       PlannerContext plannerContext,
       DruidExpression expression,
       SqlTypeName typeName
   )
   {
-    if (!isAggregateSignature && !virtualColumnsByExpression.containsKey(expression.getExpression())) {
+    if (!virtualColumnsByExpression.containsKey(expression.getExpression())) {
       final String virtualColumnName = virtualColumnPrefix + virtualColumnCounter++;
       final VirtualColumn virtualColumn = expression.toVirtualColumn(
           virtualColumnName,
@@ -126,23 +109,25 @@ public class DruidQuerySignature
   @Nullable
   public VirtualColumn getVirtualColumn(String virtualColumnName)
   {
-    return virtualColumnsByName.getOrDefault(virtualColumnName, null);
+    return virtualColumnsByName.get(virtualColumnName);
   }
 
   /**
-   * Create as an "immutable" "aggregate" signature for a grouping, so that post aggregations and having filters
-   * can not define new virtual columns
-   * @param sourceSignature
-   * @return
+   * Get a signature representing the base signature plus all registered virtual columns.
    */
-  public DruidQuerySignature asAggregateSignature(RowSignature sourceSignature)
+  public RowSignature getFullRowSignature()
   {
-    return new DruidQuerySignature(
-        sourceSignature,
-        virtualColumnPrefix,
-        virtualColumnsByExpression,
-        virtualColumnsByName,
-        true
-    );
+    final RowSignature.Builder builder = RowSignature.builder();
+
+    for (String columnName : baseRowSignature.getRowOrder()) {
+      builder.add(columnName, baseRowSignature.getColumnType(columnName));
+    }
+
+    for (VirtualColumn virtualColumn : virtualColumnsByName.values()) {
+      final String columnName = virtualColumn.getOutputName();
+      builder.add(columnName, virtualColumn.capabilities(columnName).getType());
+    }
+
+    return builder.build();
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
index 07e414e..c20f2ac 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
@@ -60,11 +60,6 @@ public class DruidRules
             PartialDruidQuery::withSelectProject
         ),
         new DruidQueryRule<>(
-            Sort.class,
-            PartialDruidQuery.Stage.SELECT_SORT,
-            PartialDruidQuery::withSelectSort
-        ),
-        new DruidQueryRule<>(
             Aggregate.class,
             PartialDruidQuery.Stage.AGGREGATE,
             PartialDruidQuery::withAggregate
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
index 5989152..2209854 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
@@ -31,7 +31,8 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignature;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -52,7 +53,8 @@ public class GroupByRules
    */
   public static Aggregation translateAggregateCall(
       final PlannerContext plannerContext,
-      final DruidQuerySignature querySignature,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
       final RexBuilder rexBuilder,
       final Project project,
       final List<Aggregation> existingAggregations,
@@ -71,11 +73,18 @@ public class GroupByRules
       }
 
       final RexNode expression = project.getChildExps().get(call.filterArg);
-      final DimFilter nonOptimizedFilter = Expressions.toFilter(plannerContext, querySignature, expression);
+      final DimFilter nonOptimizedFilter = Expressions.toFilter(
+          plannerContext,
+          rowSignature,
+          virtualColumnRegistry,
+          expression
+      );
       if (nonOptimizedFilter == null) {
         return null;
       } else {
-        filter = Filtration.create(nonOptimizedFilter).optimizeFilterOnly(querySignature).getDimFilter();
+        filter = Filtration.create(nonOptimizedFilter)
+                           .optimizeFilterOnly(virtualColumnRegistry.getFullRowSignature())
+                           .getDimFilter();
       }
     } else {
       filter = null;
@@ -121,9 +130,13 @@ public class GroupByRules
 
     final Aggregation retVal = sqlAggregator.toDruidAggregation(
         plannerContext,
-        querySignature,
+        rowSignature,
+        virtualColumnRegistry,
         rexBuilder,
-        name, call, project, existingAggregationsWithSameFilter,
+        name,
+        call,
+        project,
+        existingAggregationsWithSameFilter,
         finalizeAggregations
     );
 
@@ -134,7 +147,7 @@ public class GroupByRules
       if (isUsingExistingAggregation(retVal, existingAggregationsWithSameFilter)) {
         return retVal;
       } else {
-        return retVal.filter(querySignature, filter);
+        return retVal.filter(rowSignature, virtualColumnRegistry, filter);
       }
     }
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 7db1910..9c8aac0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -744,6 +744,106 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   }
 
   @Test
+  public void testSelectStarFromSelectSingleColumnWithLimitDescending() throws Exception
+  {
+    testQuery(
+        "SELECT * FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC) LIMIT 2",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE1)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns(ImmutableList.of("__time", "dim1"))
+                .limit(2)
+                .order(ScanQuery.Order.DESCENDING)
+                .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"abc"},
+            new Object[]{"def"}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectProjectionFromSelectSingleColumnWithInnerLimitDescending() throws Exception
+  {
+    testQuery(
+        "SELECT 'beep ' || dim1 FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 2)",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE1)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(expressionVirtualColumn("v0", "concat('beep ',\"dim1\")", ValueType.STRING))
+                .columns(ImmutableList.of("__time", "v0"))
+                .limit(2)
+                .order(ScanQuery.Order.DESCENDING)
+                .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"beep abc"},
+            new Object[]{"beep def"}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectProjectionFromSelectSingleColumnDescending() throws Exception
+  {
+    // Regression test for https://github.com/apache/incubator-druid/issues/7768.
+
+    testQuery(
+        "SELECT 'beep ' || dim1 FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC)",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE1)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(expressionVirtualColumn("v0", "concat('beep ',\"dim1\")", ValueType.STRING))
+                .columns(ImmutableList.of("__time", "v0"))
+                .order(ScanQuery.Order.DESCENDING)
+                .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"beep abc"},
+            new Object[]{"beep def"},
+            new Object[]{"beep 1"},
+            new Object[]{"beep 2"},
+            new Object[]{"beep 10.1"},
+            new Object[]{"beep "}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectProjectionFromSelectSingleColumnWithInnerAndOuterLimitDescending() throws Exception
+  {
+    testQuery(
+        "SELECT 'beep ' || dim1 FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 4) LIMIT 2",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(CalciteTests.DATASOURCE1)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(expressionVirtualColumn("v0", "concat('beep ',\"dim1\")", ValueType.STRING))
+                .columns(ImmutableList.of("__time", "v0"))
+                .limit(2)
+                .order(ScanQuery.Order.DESCENDING)
+                .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"beep abc"},
+            new Object[]{"beep def"}
+        )
+    );
+  }
+
+  @Test
   public void testGroupBySingleColumnDescendingNoTopN() throws Exception
   {
     testQuery(
@@ -1823,7 +1923,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
     // It's also here so when we do support these features, we can have "real" tests for these queries.
 
     final List<String> queries = ImmutableList.of(
-        "SELECT dim1 FROM druid.foo ORDER BY dim1", // SELECT query with order by
+        "SELECT dim1 FROM druid.foo ORDER BY dim1", // SELECT query with order by non-__time
         "SELECT COUNT(*) FROM druid.foo x, druid.foo y", // Self-join
         "SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5", // DISTINCT with OFFSET
         "SELECT COUNT(*) FROM foo WHERE dim1 NOT IN (SELECT dim1 FROM foo WHERE dim2 = 'a')", // NOT IN subquery
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/filtration/FiltrationTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/filtration/FiltrationTest.java
index 9d91a42..d8f6f6f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/filtration/FiltrationTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/filtration/FiltrationTest.java
@@ -25,7 +25,6 @@ import org.apache.druid.query.filter.IntervalDimFilter;
 import org.apache.druid.query.filter.NotDimFilter;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ValueType;
-import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
 import org.apache.druid.sql.calcite.table.RowSignature;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.junit.Assert;
@@ -45,7 +44,7 @@ public class FiltrationTest extends CalciteTestBase
             )
         ),
         null
-    ).optimize(new DruidQuerySignature(RowSignature.builder().add(ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG).build()));
+    ).optimize(RowSignature.builder().add(ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG).build());
 
     Assert.assertEquals(
         ImmutableList.of(Filtration.eternity()),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org