You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/02 04:41:55 UTC

[impala] branch master updated: IMPALA-9983 : Pushdown limit to analytic sort operator

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ecfc1af  IMPALA-9983 : Pushdown limit to analytic sort operator
ecfc1af is described below

commit ecfc1af0db18de7ad47406b9bff3dcdfdb9d2a05
Author: Aman Sinha <am...@cloudera.com>
AuthorDate: Mon Jul 20 12:25:30 2020 -0700

    IMPALA-9983 : Pushdown limit to analytic sort operator
    
    This patch pushes the LIMIT from a top level Sort down to
    the Sort below an Analytic operator when it is safe to do
    so. There are several qualifying checks that are done. The
    optimization is done at the time of creating the top level
    Sort in the single node planner. When the pushdown is
    applicable, the analytic sort is converted to a TopN sort.
    Further, this is split into a bottom TopN and an upper
    TopN separated by a hash partition exchange. This
    ensures that the limit is applied as early as possible
    before hash partitioning.
    
    Fixed couple of additional related issues uncovered as a
    result of limit pushdown:
     - Changed the analytic sort's partition-by expr sort
       semantic from NULLS FIRST to NULLS LAST to ensure
       correctness in the presence of limit.
     - The LIMIT on the analytic sort node was causing it to
       be treated as a merging point in the distributed planner.
       Fixed it by introducing an api allowPartitioned() in the
       PlanNode.
    
    Testing:
     - Ran PlannerTest and updated several EXPLAIN plans.
     - Added Planner tests for both positive and negative cases of
       limit pushdown.
     - Ran end-to-end TPC-DS queries. Specifically tested
       TPC-DS q67 for limit pushdown and result correctness.
     - Added targeted end-to-end tests using TPC-H dataset.
    
    Change-Id: Ib39f46a7bb75a34466eef7f91ddc25b6e6c99284
    Reviewed-on: http://gerrit.cloudera.org:8080/16219
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/AnalyticExpr.java   |   2 +-
 .../org/apache/impala/analysis/AnalyticWindow.java |   2 +-
 .../java/org/apache/impala/analysis/SortInfo.java  |   4 +
 .../apache/impala/planner/AnalyticEvalNode.java    | 171 +++++
 .../org/apache/impala/planner/AnalyticPlanner.java |  16 +-
 .../apache/impala/planner/DistributedPlanner.java  |  46 +-
 .../java/org/apache/impala/planner/PlanNode.java   |   2 +
 .../java/org/apache/impala/planner/SelectNode.java |  13 +
 .../apache/impala/planner/SingleNodePlanner.java   |  81 +++
 .../java/org/apache/impala/planner/SortNode.java   |  38 +-
 .../org/apache/impala/planner/PlannerTest.java     |   8 +
 .../queries/PlannerTest/analytic-fns-mt-dop.test   |  24 +-
 .../queries/PlannerTest/analytic-fns.test          | 198 +++---
 .../queries/PlannerTest/constant-folding.test      |   2 +-
 .../queries/PlannerTest/convert-to-cnf.test        |   2 +-
 .../queries/PlannerTest/inline-view.test           |   2 +-
 .../queries/PlannerTest/insert.test                | 110 +--
 .../PlannerTest/limit-pushdown-analytic.test       | 782 +++++++++++++++++++++
 .../queries/PlannerTest/max-row-size.test          |  22 +-
 .../queries/PlannerTest/mt-dop-validation.test     |  38 +-
 .../queries/PlannerTest/nested-collections.test    |  32 +-
 .../queries/PlannerTest/resource-requirements.test | 139 +++-
 .../PlannerTest/runtime-filter-propagation.test    |   8 +-
 .../queries/PlannerTest/semi-join-distinct.test    |  12 +-
 .../PlannerTest/sort-expr-materialization.test     |   2 +-
 .../queries/PlannerTest/tpcds-all.test             |  94 +--
 .../tpch/queries/limit-pushdown-analytic.test      |  74 ++
 tests/query_test/test_limit_pushdown_analytic.py   |  37 +
 28 files changed, 1673 insertions(+), 288 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
index 6ca8809..1fac321 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
@@ -219,7 +219,7 @@ public class AnalyticExpr extends Expr {
     return isAnalyticFn(fn, MIN) || isAnalyticFn(fn, MAX);
   }
 
-  static private boolean isRankingFn(Function fn) {
+  public static boolean isRankingFn(Function fn) {
     return isAnalyticFn(fn, RANK) || isAnalyticFn(fn, DENSERANK) ||
         isAnalyticFn(fn, ROWNUMBER);
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
index 8c03e56..7b460f9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
@@ -59,7 +59,7 @@ public class AnalyticWindow {
     }
   }
 
-  enum BoundaryType {
+  public enum BoundaryType {
     UNBOUNDED_PRECEDING("UNBOUNDED PRECEDING"),
     UNBOUNDED_FOLLOWING("UNBOUNDED FOLLOWING"),
     CURRENT_ROW("CURRENT ROW"),
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index edbdfea..ab2babf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -49,6 +49,8 @@ public class SortInfo {
       Expr.FUNCTION_CALL_COST;
 
   private List<Expr> sortExprs_;
+  // List of original sort exprs (bofore smap substitutions)
+  private List<Expr> origSortExprs_;
   private final List<Boolean> isAscOrder_;
   // True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
   private final List<Boolean> nullsFirstParams_;
@@ -72,6 +74,7 @@ public class SortInfo {
     Preconditions.checkArgument(sortExprs.size() == isAscOrder.size());
     Preconditions.checkArgument(sortExprs.size() == nullsFirstParams.size());
     sortExprs_ = sortExprs;
+    origSortExprs_ = Expr.cloneList(sortExprs_);
     isAscOrder_ = isAscOrder;
     nullsFirstParams_ = nullsFirstParams;
     materializedExprs_ = new ArrayList<>();
@@ -93,6 +96,7 @@ public class SortInfo {
   }
 
   public List<Expr> getSortExprs() { return sortExprs_; }
+  public List<Expr> getOrigSortExprs() { return origSortExprs_; }
   public List<Boolean> getIsAscOrder() { return isAscOrder_; }
   public List<Boolean> getNullsFirstParams() { return nullsFirstParams_; }
   public List<Expr> getMaterializedExprs() { return materializedExprs_; }
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 1b39d50..efc2273 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -20,9 +20,11 @@ package org.apache.impala.planner;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.common.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.impala.analysis.AnalyticExpr;
 import org.apache.impala.analysis.AnalyticWindow;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
@@ -30,6 +32,7 @@ import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.CompoundPredicate;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.OrderByElement;
 import org.apache.impala.analysis.SlotDescriptor;
@@ -37,6 +40,8 @@ import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.CompoundPredicate.Operator;
+import org.apache.impala.analysis.NumericLiteral;
+import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.thrift.TAnalyticNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
@@ -356,4 +361,170 @@ public class AnalyticEvalNode extends PlanNode {
         .setMinMemReservationBytes(perInstanceMinMemReservation)
         .setSpillableBufferBytes(bufferSize).setMaxRowBufferBytes(bufferSize).build();
   }
+
+  /**
+   * Check if it is safe to push down limit to the Sort node of this AnalyticEval.
+   * Qualifying checks:
+   *  - The analytic node is evaluating a single analytic function which must be
+   *    a ranking function.
+   *  - The partition-by exprs must be a prefix of the sort exprs in sortInfo
+   *  - If there is a predicate on the analytic function (provided through the
+   *    selectNode), the predicate's eligibility is checked (see further below)
+   * @param sortInfo The sort info from the outer sort node
+   * @param selectNode The selection node with predicates on analytic function.
+   *    This can be null if no such predicate is present.
+   * @param limit Limit value from the outer sort node
+   * @param analyticNodeSort The analytic sort associated with this analytic node
+   * @param sortExprsForPartitioning A placeholder list supplied by caller that is
+   *     populated with the sort exprs of the analytic sort that will be later used
+   *     for hash partitioning of the distributed TopN.
+   * @param analyzer analyzer instance
+   * @return True if limit pushdown into analytic sort is safe, False if not
+   */
+  public boolean isLimitPushdownSafe(SortInfo sortInfo, SelectNode selectNode,
+    long limit, SortNode analyticNodeSort, List<Expr> sortExprsForPartitioning,
+    Analyzer analyzer) {
+    if (analyticFnCalls_.size() != 1) return false;
+    Expr expr = analyticFnCalls_.get(0);
+    if (!(expr instanceof FunctionCallExpr) ||
+         (!AnalyticExpr.isRankingFn(((FunctionCallExpr) expr).getFn()))) {
+      return false;
+    }
+    List<Expr> analyticSortSortExprs = analyticNodeSort.getSortInfo().getSortExprs();
+
+    // In the mapping below, we use the original sort exprs that the sortInfo was
+    // created with, not the sort exprs that got mapped in SortInfo.createSortTupleInfo().
+    // This allows us to substitute it using this node's output smap.
+    List<Expr> origSortExprs = sortInfo != null ? sortInfo.getOrigSortExprs() :
+            new ArrayList<>();
+    List<Expr> sortExprs = Expr.substituteList(origSortExprs, getOutputSmap(),
+            analyzer, false);
+    // Also use substituted partition exprs such that they can be compared with the
+    // sort exprs
+    List<Expr> pbExprs = substitutedPartitionExprs_;
+
+    if (sortExprs.size() == 0) {
+      // if there is no sort expr in the parent sort but only limit, we can push
+      // the limit to the sort below if there is no selection node or if
+      // the predicate in the selection node is eligible
+      if (selectNode == null) {
+        return true;
+      }
+      Pair<Boolean, Double> status =
+              isPredEligibleForLimitPushdown(selectNode.getConjuncts(), limit);
+      if (status.first) {
+        sortExprsForPartitioning.addAll(analyticSortSortExprs);
+        selectNode.setSelectivity(status.second);
+        return true;
+      }
+      return false;
+    }
+
+    if (sortExprs.size() > 0 && pbExprs.size() > sortExprs.size()) return false;
+
+    Preconditions.checkArgument(analyticSortSortExprs.size() >= pbExprs.size());
+    // Check if pby exprs are a prefix of the top level sort exprs
+    if (sortExprs.size() == 0) {
+      sortExprsForPartitioning.addAll(pbExprs);
+    } else {
+      for (int i = 0; i < pbExprs.size(); i++) {
+        Expr pbExpr = pbExprs.get(i);
+        Expr sortExpr = sortExprs.get(i);
+        if (!(pbExpr instanceof SlotRef && sortExpr instanceof SlotRef)) return false;
+
+        if (!((SlotRef) pbExpr).equals(((SlotRef) sortExpr))) {
+          // pby exprs are not a prefix of the top level sort exprs
+          return false;
+        } else {
+          // get the corresponding sort expr from the analytic sort
+          // since that's what will eventually be used for hash partitioning
+          sortExprsForPartitioning.add(analyticSortSortExprs.get(i));
+        }
+        // check the ASC/DESC and NULLS FIRST/LAST compatibility.
+        if (!sortInfo.getIsAscOrder().get(i) || sortInfo.getNullsFirst().get(i)) {
+          return false;
+        }
+      }
+    }
+
+    // check that the window frame is UNBOUNDED PRECEDING to CURRENT ROW
+    if (!(analyticWindow_.getLeftBoundary().getType() ==
+          AnalyticWindow.BoundaryType.UNBOUNDED_PRECEDING
+          && analyticWindow_.getRightBoundary().getType() ==
+            AnalyticWindow.BoundaryType.CURRENT_ROW)) {
+      return false;
+    }
+
+    if (selectNode == null) {
+      return true;
+    } else {
+      Pair<Boolean, Double> status =
+              isPredEligibleForLimitPushdown(selectNode.getConjuncts(), limit);
+      if (status.first) {
+        selectNode.setSelectivity(status.second);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check eligibility of a predicate (provided as list of conjuncts) for limit
+   * pushdown optimization.
+   * @param conjuncts list of conjuncts from the predicate
+   * @param limit limit from outer sort
+   * @return a Pair whose first value is True if the conjuncts+limit allows pushdown,
+   *   False otherwise. Second value is the predicate's estimated selectivity
+   */
+  private Pair<Boolean, Double> isPredEligibleForLimitPushdown(List<Expr> conjuncts,
+        long limit) {
+    Pair<Boolean, Double> falseStatus = new Pair<>(false, -1.0);
+    // Currently, single conjuncts are supported.  In the future, multiple conjuncts
+    // involving a range e.g 'col >= 10 AND col <= 20' could potentially be supported
+    if (conjuncts.size() > 1) return falseStatus;
+    Expr conj = conjuncts.get(0);
+    if (!(Expr.IS_BINARY_PREDICATE.apply(conj))) return falseStatus;
+    BinaryPredicate pred = (BinaryPredicate) conj;
+    Expr lhs = pred.getChild(0);
+    Expr rhs = pred.getChild(1);
+    // Lhs of the binary predicate must be a ranking function.
+    // Also, it must be bound to the output tuple of this analytic eval node
+    if (!(lhs instanceof SlotRef)) {
+      return falseStatus;
+    }
+    List<Expr> lhsSourceExprs = ((SlotRef) lhs).getDesc().getSourceExprs();
+    if (lhsSourceExprs.size() > 1 ||
+          !(lhsSourceExprs.get(0) instanceof AnalyticExpr)) {
+      return falseStatus;
+    }
+    if (!(AnalyticExpr.isRankingFn(((AnalyticExpr) lhsSourceExprs.
+          get(0)).getFnCall().getFn()))
+          || !lhs.isBound(outputTupleDesc_.getId())) {
+      return falseStatus;
+    }
+    // Restrict the pushdown for =, <, <= predicates because these ensure the
+    // qualifying rows are fully 'contained' within the LIMIT value. Other
+    // types of predicates would select rows that fall outside the LIMIT range.
+    if (!(pred.getOp() == BinaryPredicate.Operator.EQ ||
+          pred.getOp() == BinaryPredicate.Operator.LT ||
+          pred.getOp() == BinaryPredicate.Operator.LE)) {
+      return falseStatus;
+    }
+    // Rhs of the predicate must be a numeric literal and its value
+    // must be less than or equal to the limit.
+    if (!(rhs instanceof NumericLiteral) ||
+          ((NumericLiteral)rhs).getLongValue() > limit) {
+      return falseStatus;
+    }
+    double selectivity = Expr.DEFAULT_SELECTIVITY;
+    // Since the predicate is qualified for limit pushdown, estimate its selectivity.
+    // For EQ conditions, leave it as the default.  For LT and LE, assume all of the
+    // 'limit' rows will be returned.
+    if (pred.getOp() == BinaryPredicate.Operator.LT ||
+            pred.getOp() == BinaryPredicate.Operator.LE) {
+      selectivity = 1.0;
+    }
+    return new Pair<Boolean, Double>(true, selectivity);
+  }
+
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 63990e6..57e38be 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -340,16 +340,18 @@ public class AnalyticPlanner {
     for (OrderByElement elmt : orderByElements) {
       isConstSort = isConstSort && elmt.getExpr().isConstant();
     }
+
+    SortNode sortNode = null;
     // sort on partition by (pb) + order by (ob) exprs and create pb/ob predicates
     if (hasActivePartition || !isConstSort) {
       // first sort on partitionExprs (direction doesn't matter)
       List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
+      // for PB exprs use ASC, NULLS LAST to match the behavior of the default
+      // order-by and to ensure that limit pushdown works correctly
       List<Boolean> isAsc =
           Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
-      // TODO: utilize a direction and nulls/first last that has benefit
-      // for subsequent sort groups
       List<Boolean> nullsFirst =
-          Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
+          Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(false)));
 
       // then sort on orderByExprs
       for (OrderByElement orderByElement: sortGroup.orderByElements) {
@@ -365,7 +367,7 @@ public class AnalyticPlanner {
       SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst);
       // IMPALA-8533: Avoid generating sort with empty tuple descriptor
       if(sortInfo.getSortTupleDescriptor().getSlots().size() > 0) {
-        SortNode sortNode =
+        sortNode =
             SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, 0);
 
         // if this sort group does not have partitioning exprs, we want the sort
@@ -386,6 +388,7 @@ public class AnalyticPlanner {
       }
     }
 
+    AnalyticEvalNode lowestAnalyticNode = null;
     // create one AnalyticEvalNode per window group
     for (WindowGroup windowGroup: sortGroup.windowGroups) {
       root = new AnalyticEvalNode(ctx_.getNextNodeId(), root,
@@ -394,7 +397,12 @@ public class AnalyticPlanner {
           windowGroup.physicalIntermediateTuple, windowGroup.physicalOutputTuple,
           windowGroup.logicalToPhysicalSmap);
       root.init(analyzer_);
+      if (lowestAnalyticNode == null) {
+        lowestAnalyticNode = (AnalyticEvalNode) root;
+        if (sortNode != null) sortNode.setAnalyticEvalNode(lowestAnalyticNode);
+      }
     }
+
     return root;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index f8fdd60..7496bd1 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -99,7 +99,8 @@ public class DistributedPlanner {
       // allow child fragments to be partitioned, unless they contain a limit clause
       // (the result set with the limit constraint needs to be computed centrally);
       // merge later if needed
-      boolean childIsPartitioned = !child.hasLimit();
+      boolean childIsPartitioned = child.allowPartitioned();
+
       // Do not fragment the subplan of a SubplanNode since it is executed locally.
       if (root instanceof SubplanNode && child == root.getChild(1)) continue;
       childFragments.add(createPlanFragments(child, childIsPartitioned, fragments));
@@ -773,6 +774,11 @@ public class DistributedPlanner {
     childFragment.setDestination(exchangeNode);
   }
 
+  private PlanFragment createParentFragment(
+      PlanFragment childFragment, DataPartition parentPartition) throws ImpalaException {
+    return createParentFragment(childFragment, parentPartition, false);
+  }
+
   /**
    * Create a new fragment containing a single ExchangeNode that consumes the output
    * of childFragment, set the destination of childFragment to the new parent
@@ -784,11 +790,12 @@ public class DistributedPlanner {
    * correct for the input).
    */
   private PlanFragment createParentFragment(
-      PlanFragment childFragment, DataPartition parentPartition)
+      PlanFragment childFragment, DataPartition parentPartition, boolean unsetLimit)
       throws ImpalaException {
     ExchangeNode exchangeNode =
         new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot());
     exchangeNode.init(ctx_.getRootAnalyzer());
+    if (unsetLimit) exchangeNode.unsetLimit();
     PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(),
         exchangeNode, parentPartition);
     childFragment.setDestination(exchangeNode);
@@ -1019,16 +1026,47 @@ public class DistributedPlanner {
     SortNode sortNode = (SortNode) node;
     Preconditions.checkState(sortNode.isAnalyticSort());
     PlanFragment analyticFragment = childFragment;
+
+    boolean addedLowerTopN = false;
+    SortNode lowerTopN = null;
+    AnalyticEvalNode analyticNode = sortNode.getAnalyticEvalNode();
     if (sortNode.getInputPartition() != null) {
       sortNode.getInputPartition().substitute(
           childFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer());
       // Make sure the childFragment's output is partitioned as required by the sortNode.
       DataPartition sortPartition = sortNode.getInputPartition();
       if (!childFragment.getDataPartition().equals(sortPartition)) {
-        analyticFragment = createParentFragment(childFragment, sortPartition);
+        if (sortNode.hasLimit() && sortNode.isTypeTopN()) {
+          lowerTopN = sortNode;
+          childFragment.addPlanRoot(lowerTopN);
+          addedLowerTopN = true;
+          DataPartition hashPartition =
+            DataPartition.hashPartitioned(lowerTopN.getPartitioningExprs());
+          // When creating the analytic fragment, pass in a flag to unset the limit
+          // on the partition exchange. This ensures that the exchange does not
+          // prematurely stop sending rows in case there's a downstream operator
+          // that has a LIMIT - for instance a Sort with LIMIT after the
+          // Analytic operator.
+          analyticFragment = createParentFragment(childFragment, hashPartition,
+             true);
+        } else {
+          analyticFragment = createParentFragment(childFragment, sortPartition);
+        }
       }
     }
-    analyticFragment.addPlanRoot(sortNode);
+    if (addedLowerTopN) {
+      // Create the upper TopN node
+      SortNode upperTopN = SortNode.createTopNSortNode(ctx_.getNextNodeId(),
+              childFragment.getPlanRoot(), lowerTopN.getSortInfo(), sortNode.getOffset());
+      upperTopN.setIsAnalyticSort(true);
+      upperTopN.init(ctx_.getRootAnalyzer());
+      upperTopN.setLimit(lowerTopN.getLimit());
+      // connect this to the analytic eval node
+      analyticNode.setChild(0, upperTopN);
+      analyticFragment.addPlanRoot(upperTopN);
+    } else {
+      analyticFragment.addPlanRoot(sortNode);
+    }
     return analyticFragment;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index ce492dd..30dc5a4 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -990,4 +990,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public void setDisableCodegen(boolean disableCodegen) {
     disableCodegen_ = disableCodegen;
   }
+
+  public boolean allowPartitioned() { return !hasLimit(); }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index a5437c0..48d033a 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -36,9 +36,12 @@ import com.google.common.base.Preconditions;
  */
 public class SelectNode extends PlanNode {
   private final static Logger LOG = LoggerFactory.getLogger(SelectNode.class);
+  // in some optimizations the selectivity may be set explicitly
+  private double selectivity_;
 
   protected SelectNode(PlanNodeId id, PlanNode child, List<Expr> conjuncts) {
     super(id, "SELECT");
+    selectivity_ = -1.0;
     addChild(child);
     conjuncts_.addAll(conjuncts);
     computeTupleIds();
@@ -81,6 +84,16 @@ public class SelectNode extends PlanNode {
   }
 
   @Override
+  protected double computeSelectivity() {
+    if (selectivity_ == -1) {
+      return super.computeSelectivity();
+    }
+    return selectivity_;
+  }
+
+  public void setSelectivity(double value) { selectivity_ = value; }
+
+  @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // The select node initializes a single row-batch which it recycles on every
     // GetNext() call made to its child node. The memory attached to that
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index b1401cf..b7bcf4c 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -322,6 +322,9 @@ public class SingleNodePlanner {
     } else {
       root.setLimit(stmt.getLimit());
       root.computeStats(analyzer);
+      if (root.hasLimit()) {
+        checkAndApplyLimitPushdown(root, null, root.getLimit(), analyzer);
+      }
     }
 
     return root;
@@ -337,6 +340,10 @@ public class SingleNodePlanner {
     SortNode sortNode;
     long topNBytesLimit = ctx_.getQueryOptions().topn_bytes_limit;
 
+    if (hasLimit && offset == 0) {
+      checkAndApplyLimitPushdown(root, sortInfo, limit, analyzer);
+    }
+
     if (hasLimit && !disableTopN) {
       if (topNBytesLimit <= 0) {
         sortNode =
@@ -361,10 +368,84 @@ public class SingleNodePlanner {
     Preconditions.checkState(sortNode.hasValidStats());
     sortNode.setLimit(limit);
     sortNode.init(analyzer);
+
     return sortNode;
   }
 
   /**
+   * For certain qualifying conditions, we can push a limit from the top level
+   * sort down to the sort associated with an AnalyticEval node.
+   */
+  private void checkAndApplyLimitPushdown(PlanNode root, SortInfo sortInfo, long limit,
+      Analyzer analyzer) {
+    boolean pushdownLimit = false;
+    AnalyticEvalNode analyticNode = null;
+    List<PlanNode> intermediateNodes = new ArrayList<>();
+    List<Expr>  partitioningExprs = new ArrayList<>();
+    SortNode analyticNodeSort = null;
+    PlanNode descendant = findDescendantAnalyticNode(root, intermediateNodes);
+    if (descendant != null && intermediateNodes.size() <= 1) {
+      Preconditions.checkArgument(descendant instanceof AnalyticEvalNode);
+      analyticNode = (AnalyticEvalNode) descendant;
+      if (!(analyticNode.getChild(0) instanceof SortNode)) {
+        // if the over() clause is empty, there won't be a child SortNode
+        // so limit pushdown is not applicable
+        return;
+      }
+      analyticNodeSort = (SortNode) analyticNode.getChild(0);
+      int numNodes = intermediateNodes.size();
+      if (numNodes > 1 ||
+              (numNodes == 1 && !(intermediateNodes.get(0) instanceof SelectNode))) {
+        pushdownLimit = false;
+      } else if (numNodes == 0) {
+        pushdownLimit = analyticNode.isLimitPushdownSafe(sortInfo, null,
+            limit, analyticNodeSort, partitioningExprs, ctx_.getRootAnalyzer());
+      } else {
+        SelectNode selectNode = (SelectNode) intermediateNodes.get(0);
+        pushdownLimit = analyticNode.isLimitPushdownSafe(sortInfo, selectNode,
+            limit, analyticNodeSort, partitioningExprs, ctx_.getRootAnalyzer());
+      }
+    }
+
+    if (pushdownLimit) {
+      Preconditions.checkArgument(analyticNode != null);
+      Preconditions.checkArgument(analyticNode.getChild(0) instanceof SortNode);
+      analyticNodeSort.convertToTopN(limit, partitioningExprs, analyzer);
+      // after the limit is pushed down, update stats for the analytic eval node
+      // and intermediate nodes
+      analyticNode.computeStats(analyzer);
+      for (PlanNode n : intermediateNodes) {
+        n.computeStats(analyzer);
+      }
+    }
+  }
+
+  /**
+   * Starting from the supplied root PlanNode, traverse the descendants
+   * to find the first AnalyticEvalNode.  If a blocking node such as
+   * Join, Aggregate, Sort is encountered, return null. The
+   * 'intermediateNodes' is populated with the nodes encountered during
+   * traversal.
+   */
+  private PlanNode findDescendantAnalyticNode(PlanNode root,
+    List<PlanNode> intermediateNodes) {
+    if (root == null || root instanceof AnalyticEvalNode) {
+      return root;
+    }
+    // If we encounter a blocking operator (sort, aggregate, join), or a Subplan,
+    // there's no need to go further. Also, we bail early if we encounter multi-input
+    // operator such as union-all.  In the future, we could potentially extend the
+    // limit pushdown to both sides of a union-all
+    if (root instanceof SortNode || root instanceof AggregationNode ||
+          root instanceof JoinNode || root instanceof SubplanNode ||
+          root.getChildren().size() > 1) {
+      return null;
+    }
+    intermediateNodes.add(root);
+    return findDescendantAnalyticNode(root.getChild(0), intermediateNodes);
+  }
+
+  /**
    * If there are unassigned conjuncts that are bound by tupleIds or if there are slot
    * equivalences for tupleIds that have not yet been enforced, returns a SelectNode on
    * top of root that evaluates those conjuncts; otherwise returns root unchanged.
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 9eb8bb2..f8645ab 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -34,7 +34,6 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
 import org.apache.impala.thrift.TSortNode;
 import org.apache.impala.thrift.TSortType;
-import org.apache.impala.thrift.TSortingOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +64,13 @@ public class SortNode extends PlanNode {
   // if true, the output of this node feeds an AnalyticNode
   private boolean isAnalyticSort_;
 
+  // if this is an analytic sort, this points to the corresponding
+  // analytic eval node otherwise null
+  private AnalyticEvalNode analyticEvalNode_;
+
+  // set only for the analytic sort node
+  private List<Expr> partitioningExprs_;
+
   // info_.sortTupleSlotExprs_ substituted with the outputSmap_ for materialized slots
   // in init().
   private List<Expr> resolvedTupleExprs_;
@@ -73,7 +79,7 @@ public class SortNode extends PlanNode {
   protected long offset_;
 
   // The type of sort. Determines the exec node used in the BE.
-  private final TSortType type_;
+  private TSortType type_;
 
   // Estimated bytes of input that will go into this sort node across all backends.
   // Used for sorter spill estimation in backend code.
@@ -115,7 +121,7 @@ public class SortNode extends PlanNode {
   public long getOffset() { return offset_; }
   public void setOffset(long offset) { offset_ = offset; }
   public boolean hasOffset() { return offset_ > 0; }
-  public boolean useTopN() { return type_ == TSortType.TOPN; }
+  public boolean isTypeTopN() { return type_ == TSortType.TOPN; }
   public SortInfo getSortInfo() { return info_; }
   public void setInputPartition(DataPartition inputPartition) {
     inputPartition_ = inputPartition;
@@ -123,6 +129,30 @@ public class SortNode extends PlanNode {
   public DataPartition getInputPartition() { return inputPartition_; }
   public boolean isAnalyticSort() { return isAnalyticSort_; }
   public void setIsAnalyticSort(boolean v) { isAnalyticSort_ = v; }
+  public void setAnalyticEvalNode(AnalyticEvalNode n) { analyticEvalNode_ = n; }
+  public AnalyticEvalNode getAnalyticEvalNode() { return analyticEvalNode_; }
+
+  /**
+   * Under special cases, the planner may decide to convert a total sort into a
+   * TopN sort with limit
+   */
+  public void convertToTopN(long limit, List<Expr> partitioningExprs,
+      Analyzer analyzer) {
+    Preconditions.checkArgument(type_ == TSortType.TOTAL);
+    type_ = TSortType.TOPN;
+    displayName_ = getDisplayName(type_);
+    setLimit(limit);
+    partitioningExprs_ = partitioningExprs;
+    computeStats(analyzer);
+  }
+
+  public List<Expr> getPartitioningExprs() { return partitioningExprs_ ; }
+
+  @Override
+  public boolean allowPartitioned() {
+    if (isAnalyticSort_ && hasLimit()) return true;
+    return super.allowPartitioned();
+  }
 
   @Override
   public boolean isBlockingNode() { return type_ != TSortType.PARTIAL; }
@@ -197,6 +227,8 @@ public class SortNode extends PlanNode {
 
   @Override
   protected void toThrift(TPlanNode msg) {
+    Preconditions.checkState(!isTypeTopN() || hasLimit(), "Top-N must have limit");
+    Preconditions.checkState(offset_ >= 0);
     msg.node_type = TPlanNodeType.SORT_NODE;
     TSortInfo sort_info = new TSortInfo(Expr.treesToThrift(info_.getSortExprs()),
         info_.getIsAscOrder(), info_.getNullsFirst(), info_.getSortingOrder());
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 875a740..0e4ed25 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1073,4 +1073,12 @@ public class PlannerTest extends PlannerTestBase {
   public void testAcidTableScans() {
     runPlannerTestFile("acid-scans", "functional_orc_def");
   }
+
+  /**
+   * Test limit pushdown into analytic sort under applicable conditions
+   */
+  @Test
+  public void testLimitPushdownAnalytic() {
+    runPlannerTestFile("limit-pushdown-analytic");
+  }
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns-mt-dop.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns-mt-dop.test
index a8ca680..dc81606 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns-mt-dop.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns-mt-dop.test
@@ -14,7 +14,7 @@ PLAN-ROOT SINK
 |  row-size=28B cardinality=2.88M
 |
 03:SORT
-|  order by: ss_sold_date_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST
+|  order by: ss_sold_date_sk ASC NULLS LAST, ss_store_sk ASC NULLS LAST
 |  row-size=20B cardinality=2.88M
 |
 02:ANALYTIC
@@ -23,11 +23,11 @@ PLAN-ROOT SINK
 |  row-size=20B cardinality=2.88M
 |
 01:SORT
-|  order by: ss_addr_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST
+|  order by: ss_addr_sk ASC NULLS LAST, ss_store_sk ASC NULLS LAST
 |  row-size=12B cardinality=2.88M
 |
 00:SCAN HDFS [tpcds_parquet.store_sales]
-   HDFS partitions=1824/1824 files=1824 size=196.96MB
+   HDFS partitions=1824/1824 files=1824 size=200.95MB
    row-size=12B cardinality=2.88M
 ---- PARALLELPLANS
 PLAN-ROOT SINK
@@ -40,7 +40,7 @@ PLAN-ROOT SINK
 |  row-size=28B cardinality=2.88M
 |
 03:SORT
-|  order by: ss_sold_date_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST
+|  order by: ss_sold_date_sk ASC NULLS LAST, ss_store_sk ASC NULLS LAST
 |  row-size=20B cardinality=2.88M
 |
 06:EXCHANGE [HASH(ss_sold_date_sk,ss_store_sk)]
@@ -51,13 +51,13 @@ PLAN-ROOT SINK
 |  row-size=20B cardinality=2.88M
 |
 01:SORT
-|  order by: ss_addr_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST
+|  order by: ss_addr_sk ASC NULLS LAST, ss_store_sk ASC NULLS LAST
 |  row-size=12B cardinality=2.88M
 |
 05:EXCHANGE [HASH(ss_addr_sk,ss_store_sk)]
 |
 00:SCAN HDFS [tpcds_parquet.store_sales]
-   HDFS partitions=1824/1824 files=1824 size=196.96MB
+   HDFS partitions=1824/1824 files=1824 size=200.95MB
    row-size=12B cardinality=2.88M
 ====
 # Negative case for IMPALA-9000 - ss_item_sk has NDV=17975 and thus partitioning on it
@@ -74,7 +74,7 @@ PLAN-ROOT SINK
 |  row-size=32B cardinality=2.88M
 |
 03:SORT
-|  order by: ss_sold_date_sk ASC NULLS FIRST, ss_item_sk ASC NULLS FIRST
+|  order by: ss_sold_date_sk ASC NULLS LAST, ss_item_sk ASC NULLS LAST
 |  row-size=24B cardinality=2.88M
 |
 02:ANALYTIC
@@ -83,11 +83,11 @@ PLAN-ROOT SINK
 |  row-size=24B cardinality=2.88M
 |
 01:SORT
-|  order by: ss_addr_sk ASC NULLS FIRST, ss_item_sk ASC NULLS FIRST
+|  order by: ss_addr_sk ASC NULLS LAST, ss_item_sk ASC NULLS LAST
 |  row-size=16B cardinality=2.88M
 |
 00:SCAN HDFS [tpcds_parquet.store_sales]
-   HDFS partitions=1824/1824 files=1824 size=196.96MB
+   HDFS partitions=1824/1824 files=1824 size=200.95MB
    row-size=16B cardinality=2.88M
 ---- PARALLELPLANS
 PLAN-ROOT SINK
@@ -100,7 +100,7 @@ PLAN-ROOT SINK
 |  row-size=32B cardinality=2.88M
 |
 03:SORT
-|  order by: ss_sold_date_sk ASC NULLS FIRST, ss_item_sk ASC NULLS FIRST
+|  order by: ss_sold_date_sk ASC NULLS LAST, ss_item_sk ASC NULLS LAST
 |  row-size=24B cardinality=2.88M
 |
 02:ANALYTIC
@@ -109,12 +109,12 @@ PLAN-ROOT SINK
 |  row-size=24B cardinality=2.88M
 |
 01:SORT
-|  order by: ss_addr_sk ASC NULLS FIRST, ss_item_sk ASC NULLS FIRST
+|  order by: ss_addr_sk ASC NULLS LAST, ss_item_sk ASC NULLS LAST
 |  row-size=16B cardinality=2.88M
 |
 05:EXCHANGE [HASH(ss_item_sk)]
 |
 00:SCAN HDFS [tpcds_parquet.store_sales]
-   HDFS partitions=1824/1824 files=1824 size=196.96MB
+   HDFS partitions=1824/1824 files=1824 size=200.95MB
    row-size=16B cardinality=2.88M
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index ce1b11e..1b27eec 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -15,7 +15,7 @@ PLAN-ROOT SINK
 |  row-size=6B cardinality=100
 |
 02:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  row-size=5B cardinality=100
 |
 05:AGGREGATE [FINALIZE]
@@ -49,7 +49,7 @@ PLAN-ROOT SINK
 |  row-size=14B cardinality=7.30K
 |
 03:SORT
-|  order by: int_col ASC NULLS FIRST, tinyint_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST, tinyint_col ASC NULLS LAST
 |  row-size=10B cardinality=7.30K
 |
 02:ANALYTIC
@@ -58,7 +58,7 @@ PLAN-ROOT SINK
 |  row-size=10B cardinality=7.30K
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, bool_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST
 |  row-size=6B cardinality=7.30K
 |
 05:EXCHANGE [HASH(int_col)]
@@ -92,7 +92,7 @@ PLAN-ROOT SINK
 |  row-size=8B cardinality=7.30K
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  row-size=4B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -120,7 +120,7 @@ PLAN-ROOT SINK
 |  row-size=8B cardinality=7.30K
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  row-size=4B cardinality=7.30K
 |
 05:EXCHANGE [HASH(int_col)]
@@ -165,7 +165,7 @@ PLAN-ROOT SINK
 |  row-size=26B cardinality=7.30K
 |
 05:SORT
-|  order by: bool_col ASC NULLS FIRST, bigint_col ASC, tinyint_col ASC
+|  order by: bool_col ASC NULLS LAST, bigint_col ASC, tinyint_col ASC
 |  row-size=22B cardinality=7.30K
 |
 04:ANALYTIC
@@ -176,7 +176,7 @@ PLAN-ROOT SINK
 |  row-size=22B cardinality=7.30K
 |
 03:SORT
-|  order by: int_col ASC NULLS FIRST, bigint_col DESC
+|  order by: int_col ASC NULLS LAST, bigint_col DESC
 |  row-size=18B cardinality=7.30K
 |
 02:ANALYTIC
@@ -187,7 +187,7 @@ PLAN-ROOT SINK
 |  row-size=18B cardinality=7.30K
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, bigint_col ASC
+|  order by: int_col ASC NULLS LAST, bigint_col ASC
 |  row-size=14B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -218,7 +218,7 @@ PLAN-ROOT SINK
 |  row-size=26B cardinality=7.30K
 |
 05:SORT
-|  order by: bool_col ASC NULLS FIRST, bigint_col ASC, tinyint_col ASC
+|  order by: bool_col ASC NULLS LAST, bigint_col ASC, tinyint_col ASC
 |  row-size=22B cardinality=7.30K
 |
 10:EXCHANGE [HASH(bool_col)]
@@ -231,7 +231,7 @@ PLAN-ROOT SINK
 |  row-size=22B cardinality=7.30K
 |
 03:SORT
-|  order by: int_col ASC NULLS FIRST, bigint_col DESC
+|  order by: int_col ASC NULLS LAST, bigint_col DESC
 |  row-size=18B cardinality=7.30K
 |
 02:ANALYTIC
@@ -242,7 +242,7 @@ PLAN-ROOT SINK
 |  row-size=18B cardinality=7.30K
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, bigint_col ASC
+|  order by: int_col ASC NULLS LAST, bigint_col ASC
 |  row-size=14B cardinality=7.30K
 |
 09:EXCHANGE [HASH(int_col)]
@@ -300,7 +300,7 @@ PLAN-ROOT SINK
 |  row-size=47B cardinality=7.30K
 |
 07:SORT
-|  order by: bool_col ASC NULLS FIRST, bigint_col ASC
+|  order by: bool_col ASC NULLS LAST, bigint_col ASC
 |  row-size=31B cardinality=7.30K
 |
 06:ANALYTIC
@@ -311,7 +311,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=7.30K
 |
 05:SORT
-|  order by: bool_col ASC NULLS FIRST, int_col ASC
+|  order by: bool_col ASC NULLS LAST, int_col ASC
 |  row-size=27B cardinality=7.30K
 |
 04:ANALYTIC
@@ -322,7 +322,7 @@ PLAN-ROOT SINK
 |  row-size=27B cardinality=7.30K
 |
 03:SORT
-|  order by: int_col ASC NULLS FIRST, smallint_col ASC NULLS FIRST, bigint_col ASC
+|  order by: int_col ASC NULLS LAST, smallint_col ASC NULLS LAST, bigint_col ASC
 |  row-size=19B cardinality=7.30K
 |
 02:ANALYTIC
@@ -333,7 +333,7 @@ PLAN-ROOT SINK
 |  row-size=19B cardinality=7.30K
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, smallint_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST, smallint_col ASC NULLS LAST
 |  row-size=15B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -367,7 +367,7 @@ PLAN-ROOT SINK
 |  row-size=47B cardinality=7.30K
 |
 07:SORT
-|  order by: bool_col ASC NULLS FIRST, bigint_col ASC
+|  order by: bool_col ASC NULLS LAST, bigint_col ASC
 |  row-size=31B cardinality=7.30K
 |
 06:ANALYTIC
@@ -378,7 +378,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=7.30K
 |
 05:SORT
-|  order by: bool_col ASC NULLS FIRST, int_col ASC
+|  order by: bool_col ASC NULLS LAST, int_col ASC
 |  row-size=27B cardinality=7.30K
 |
 13:EXCHANGE [HASH(bool_col)]
@@ -391,7 +391,7 @@ PLAN-ROOT SINK
 |  row-size=27B cardinality=7.30K
 |
 03:SORT
-|  order by: int_col ASC NULLS FIRST, smallint_col ASC NULLS FIRST, bigint_col ASC
+|  order by: int_col ASC NULLS LAST, smallint_col ASC NULLS LAST, bigint_col ASC
 |  row-size=19B cardinality=7.30K
 |
 02:ANALYTIC
@@ -402,7 +402,7 @@ PLAN-ROOT SINK
 |  row-size=19B cardinality=7.30K
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, smallint_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST, smallint_col ASC NULLS LAST
 |  row-size=15B cardinality=7.30K
 |
 12:EXCHANGE [HASH(int_col,smallint_col)]
@@ -448,7 +448,7 @@ PLAN-ROOT SINK
 |  row-size=17B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST
 |  row-size=9B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -465,7 +465,7 @@ PLAN-ROOT SINK
 |  row-size=17B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST
 |  row-size=9B cardinality=11.00K
 |
 03:EXCHANGE [HASH(tinyint_col)]
@@ -530,7 +530,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=10
 |
 01:SORT
-|  order by: tinyint_col + 1 ASC NULLS FIRST, double_col / 2 ASC NULLS FIRST, 4 - int_col ASC, 4 * smallint_col ASC
+|  order by: tinyint_col + 1 ASC NULLS LAST, double_col / 2 ASC NULLS LAST, 4 - int_col ASC, 4 * smallint_col ASC
 |  row-size=23B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -551,7 +551,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=10
 |
 01:SORT
-|  order by: tinyint_col + 1 ASC NULLS FIRST, double_col / 2 ASC NULLS FIRST, 4 - int_col ASC, 4 * smallint_col ASC
+|  order by: tinyint_col + 1 ASC NULLS LAST, double_col / 2 ASC NULLS LAST, 4 - int_col ASC, 4 * smallint_col ASC
 |  row-size=23B cardinality=11.00K
 |
 03:EXCHANGE [HASH(tinyint_col + 1,double_col / 2)]
@@ -607,7 +607,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=7.30K
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST, int_col DESC
+|  order by: bool_col ASC NULLS LAST, int_col DESC
 |  row-size=21B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -645,7 +645,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=7.30K
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST, int_col DESC
+|  order by: bool_col ASC NULLS LAST, int_col DESC
 |  row-size=21B cardinality=7.30K
 |
 06:EXCHANGE [HASH(bool_col)]
@@ -674,7 +674,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=99
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, int_col DESC
 |  row-size=21B cardinality=99
 |
 02:HASH JOIN [INNER JOIN]
@@ -708,7 +708,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=99
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, int_col DESC
 |  row-size=21B cardinality=99
 |
 07:EXCHANGE [HASH(a.tinyint_col)]
@@ -758,7 +758,7 @@ PLAN-ROOT SINK
 |  row-size=50B cardinality=2
 |
 04:SORT
-|  order by: min(tinyint_col) ASC NULLS FIRST, max(int_col) ASC
+|  order by: min(tinyint_col) ASC NULLS LAST, max(int_col) ASC
 |  row-size=34B cardinality=2
 |
 03:ANALYTIC
@@ -769,7 +769,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=2
 |
 02:SORT
-|  order by: min(tinyint_col) ASC NULLS FIRST, sum(int_col) ASC
+|  order by: min(tinyint_col) ASC NULLS LAST, sum(int_col) ASC
 |  row-size=26B cardinality=2
 |
 01:AGGREGATE [FINALIZE]
@@ -798,7 +798,7 @@ PLAN-ROOT SINK
 |  row-size=50B cardinality=2
 |
 04:SORT
-|  order by: min(tinyint_col) ASC NULLS FIRST, max(int_col) ASC
+|  order by: min(tinyint_col) ASC NULLS LAST, max(int_col) ASC
 |  row-size=34B cardinality=2
 |
 03:ANALYTIC
@@ -809,7 +809,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=2
 |
 02:SORT
-|  order by: min(tinyint_col) ASC NULLS FIRST, sum(int_col) ASC
+|  order by: min(tinyint_col) ASC NULLS LAST, sum(int_col) ASC
 |  row-size=26B cardinality=2
 |
 09:EXCHANGE [HASH(min(tinyint_col))]
@@ -898,7 +898,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=11.00K
 |
 06:SORT
-|  order by: tinyint_col ASC NULLS FIRST, double_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, double_col ASC NULLS LAST, int_col DESC
 |  row-size=27B cardinality=11.00K
 |
 05:ANALYTIC
@@ -916,7 +916,7 @@ PLAN-ROOT SINK
 |  row-size=23B cardinality=11.00K
 |
 03:SORT
-|  order by: double_col ASC NULLS FIRST, tinyint_col ASC NULLS FIRST, int_col ASC
+|  order by: double_col ASC NULLS LAST, tinyint_col ASC NULLS LAST, int_col ASC
 |  row-size=19B cardinality=11.00K
 |
 02:ANALYTIC
@@ -927,7 +927,7 @@ PLAN-ROOT SINK
 |  row-size=19B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, int_col DESC
 |  row-size=15B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -967,7 +967,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=11.00K
 |
 06:SORT
-|  order by: tinyint_col ASC NULLS FIRST, double_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, double_col ASC NULLS LAST, int_col DESC
 |  row-size=27B cardinality=11.00K
 |
 05:ANALYTIC
@@ -985,7 +985,7 @@ PLAN-ROOT SINK
 |  row-size=23B cardinality=11.00K
 |
 03:SORT
-|  order by: double_col ASC NULLS FIRST, tinyint_col ASC NULLS FIRST, int_col ASC
+|  order by: double_col ASC NULLS LAST, tinyint_col ASC NULLS LAST, int_col ASC
 |  row-size=19B cardinality=11.00K
 |
 02:ANALYTIC
@@ -996,7 +996,7 @@ PLAN-ROOT SINK
 |  row-size=19B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, int_col DESC
 |  row-size=15B cardinality=11.00K
 |
 11:EXCHANGE [HASH(tinyint_col)]
@@ -1047,7 +1047,7 @@ PLAN-ROOT SINK
 |  row-size=39B cardinality=11.00K
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, double_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, double_col ASC NULLS LAST, int_col DESC
 |  row-size=31B cardinality=11.00K
 |
 02:ANALYTIC
@@ -1058,7 +1058,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=11.00K
 |
 01:SORT
-|  order by: bigint_col ASC NULLS FIRST, tinyint_col ASC
+|  order by: bigint_col ASC NULLS LAST, tinyint_col ASC
 |  row-size=23B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -1093,7 +1093,7 @@ PLAN-ROOT SINK
 |  row-size=39B cardinality=11.00K
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, double_col ASC NULLS FIRST, int_col DESC
+|  order by: tinyint_col ASC NULLS LAST, double_col ASC NULLS LAST, int_col DESC
 |  row-size=31B cardinality=11.00K
 |
 09:EXCHANGE [HASH(tinyint_col,double_col)]
@@ -1106,7 +1106,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=11.00K
 |
 01:SORT
-|  order by: bigint_col ASC NULLS FIRST, tinyint_col ASC
+|  order by: bigint_col ASC NULLS LAST, tinyint_col ASC
 |  row-size=23B cardinality=11.00K
 |
 08:EXCHANGE [HASH(bigint_col)]
@@ -1141,7 +1141,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=7.30K
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=21B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -1166,7 +1166,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=7.30K
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=21B cardinality=7.30K
 |
 05:EXCHANGE [HASH(functional.alltypes.bool_col)]
@@ -1202,7 +1202,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=7.30K
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=21B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -1227,7 +1227,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=7.30K
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=21B cardinality=7.30K
 |
 05:EXCHANGE [HASH(functional.alltypes.bool_col)]
@@ -1262,7 +1262,7 @@ PLAN-ROOT SINK
 |  row-size=42B cardinality=7.30K
 |
 03:SORT
-|  order by: bool_col ASC NULLS FIRST, string_col ASC
+|  order by: bool_col ASC NULLS LAST, string_col ASC
 |  row-size=34B cardinality=7.30K
 |
 02:ANALYTIC
@@ -1271,7 +1271,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=7.30K
 |
 01:SORT
-|  order by: bigint_col ASC NULLS FIRST
+|  order by: bigint_col ASC NULLS LAST
 |  row-size=26B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -1294,7 +1294,7 @@ PLAN-ROOT SINK
 |  row-size=42B cardinality=7.30K
 |
 03:SORT
-|  order by: bool_col ASC NULLS FIRST, string_col ASC
+|  order by: bool_col ASC NULLS LAST, string_col ASC
 |  row-size=34B cardinality=7.30K
 |
 07:EXCHANGE [HASH(bool_col)]
@@ -1305,7 +1305,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=7.30K
 |
 01:SORT
-|  order by: bigint_col ASC NULLS FIRST
+|  order by: bigint_col ASC NULLS LAST
 |  row-size=26B cardinality=7.30K
 |
 06:EXCHANGE [HASH(bigint_col)]
@@ -1343,7 +1343,7 @@ PLAN-ROOT SINK
 |  |  row-size=24B cardinality=8
 |  |
 |  10:SORT
-|  |  order by: int_col ASC NULLS FIRST, id ASC
+|  |  order by: int_col ASC NULLS LAST, id ASC
 |  |  row-size=16B cardinality=8
 |  |
 |  09:SCAN HDFS [functional.alltypestiny]
@@ -1363,7 +1363,7 @@ PLAN-ROOT SINK
 |  |  row-size=9B cardinality=100
 |  |
 |  05:SORT
-|  |  order by: bool_col ASC NULLS FIRST
+|  |  order by: bool_col ASC NULLS LAST
 |  |  row-size=5B cardinality=100
 |  |
 |  04:SCAN HDFS [functional.alltypessmall]
@@ -1376,7 +1376,7 @@ PLAN-ROOT SINK
 |  row-size=12B cardinality=7.30K
 |
 02:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  row-size=8B cardinality=7.30K
 |
 01:SCAN HDFS [functional.alltypes]
@@ -1404,7 +1404,7 @@ PLAN-ROOT SINK
 |  |  row-size=24B cardinality=8
 |  |
 |  10:SORT
-|  |  order by: int_col ASC NULLS FIRST, id ASC
+|  |  order by: int_col ASC NULLS LAST, id ASC
 |  |  row-size=16B cardinality=8
 |  |
 |  17:EXCHANGE [HASH(int_col)]
@@ -1432,7 +1432,7 @@ PLAN-ROOT SINK
 |  |  row-size=9B cardinality=100
 |  |
 |  05:SORT
-|  |  order by: bool_col ASC NULLS FIRST
+|  |  order by: bool_col ASC NULLS LAST
 |  |  row-size=5B cardinality=100
 |  |
 |  14:EXCHANGE [HASH(bool_col)]
@@ -1447,7 +1447,7 @@ PLAN-ROOT SINK
 |  row-size=12B cardinality=7.30K
 |
 02:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  row-size=8B cardinality=7.30K
 |
 13:EXCHANGE [HASH(int_col)]
@@ -1475,7 +1475,7 @@ PLAN-ROOT SINK
 |  |  row-size=21B cardinality=1
 |  |
 |  02:SORT
-|  |  order by: bool_col ASC NULLS FIRST
+|  |  order by: bool_col ASC NULLS LAST
 |  |  row-size=13B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypestiny t2]
@@ -1505,7 +1505,7 @@ PLAN-ROOT SINK
 |  |  row-size=21B cardinality=1
 |  |
 |  02:SORT
-|  |  order by: bool_col ASC NULLS FIRST
+|  |  order by: bool_col ASC NULLS LAST
 |  |  row-size=13B cardinality=1
 |  |
 |  05:EXCHANGE [HASH(bool_col)]
@@ -1576,7 +1576,7 @@ PLAN-ROOT SINK
 |  row-size=37B cardinality=730
 |
 03:SORT
-|  order by: bigint_col ASC NULLS FIRST, id ASC
+|  order by: bigint_col ASC NULLS LAST, id ASC
 |  row-size=29B cardinality=730
 |
 02:ANALYTIC
@@ -1585,7 +1585,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=730
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=17B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
@@ -1620,7 +1620,7 @@ PLAN-ROOT SINK
 |  row-size=37B cardinality=730
 |
 03:SORT
-|  order by: bigint_col ASC NULLS FIRST, id ASC
+|  order by: bigint_col ASC NULLS LAST, id ASC
 |  row-size=29B cardinality=730
 |
 09:EXCHANGE [HASH(bigint_col)]
@@ -1631,7 +1631,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=730
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=17B cardinality=730
 |
 08:EXCHANGE [HASH(bool_col)]
@@ -1673,7 +1673,7 @@ PLAN-ROOT SINK
 |  row-size=24B cardinality=7.30K
 |
 01:SORT
-|  order by: bigint_col ASC NULLS FIRST, id ASC
+|  order by: bigint_col ASC NULLS LAST, id ASC
 |  row-size=16B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -1713,7 +1713,7 @@ PLAN-ROOT SINK
 |  row-size=24B cardinality=7.30K
 |
 01:SORT
-|  order by: bigint_col ASC NULLS FIRST, id ASC
+|  order by: bigint_col ASC NULLS LAST, id ASC
 |  row-size=16B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -1735,7 +1735,7 @@ PLAN-ROOT SINK
 |  row-size=13B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC
+|  order by: tinyint_col ASC NULLS LAST, id ASC
 |  row-size=5B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -1770,7 +1770,7 @@ PLAN-ROOT SINK
 |  row-size=31B cardinality=11.00K
 |
 07:SORT
-|  order by: int_col ASC NULLS FIRST, id ASC
+|  order by: int_col ASC NULLS LAST, id ASC
 |  row-size=27B cardinality=11.00K
 |
 06:ANALYTIC
@@ -1788,7 +1788,7 @@ PLAN-ROOT SINK
 |  row-size=23B cardinality=11.00K
 |
 04:SORT
-|  order by: smallint_col ASC NULLS FIRST, id ASC
+|  order by: smallint_col ASC NULLS LAST, id ASC
 |  row-size=19B cardinality=11.00K
 |
 03:ANALYTIC
@@ -1806,7 +1806,7 @@ PLAN-ROOT SINK
 |  row-size=15B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC
+|  order by: tinyint_col ASC NULLS LAST, id ASC
 |  row-size=11B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -1857,7 +1857,7 @@ PLAN-ROOT SINK
 |  row-size=42B cardinality=11.00K
 |
 05:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC NULLS LAST, bool_col DESC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST, id ASC NULLS LAST, bool_col DESC NULLS FIRST
 |  row-size=34B cardinality=11.00K
 |
 04:ANALYTIC
@@ -1868,7 +1868,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=11.00K
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC, int_col ASC
+|  order by: tinyint_col ASC NULLS LAST, id ASC, int_col ASC
 |  row-size=26B cardinality=11.00K
 |
 02:ANALYTIC
@@ -1879,7 +1879,7 @@ PLAN-ROOT SINK
 |  row-size=26B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id DESC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST, id DESC NULLS FIRST
 |  row-size=18B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -1937,7 +1937,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=11.00K
 |
 05:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC NULLS LAST, bool_col DESC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST, id ASC NULLS LAST, bool_col DESC NULLS FIRST
 |  row-size=30B cardinality=11.00K
 |
 04:ANALYTIC
@@ -1948,7 +1948,7 @@ PLAN-ROOT SINK
 |  row-size=30B cardinality=11.00K
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC, int_col ASC
+|  order by: tinyint_col ASC NULLS LAST, id ASC, int_col ASC
 |  row-size=22B cardinality=11.00K
 |
 02:ANALYTIC
@@ -1959,7 +1959,7 @@ PLAN-ROOT SINK
 |  row-size=22B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id DESC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST, id DESC NULLS FIRST
 |  row-size=18B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -2009,7 +2009,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=11.00K
 |
 05:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC NULLS LAST, bool_col DESC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST, id ASC NULLS LAST, bool_col DESC NULLS FIRST
 |  row-size=30B cardinality=11.00K
 |
 04:ANALYTIC
@@ -2020,7 +2020,7 @@ PLAN-ROOT SINK
 |  row-size=30B cardinality=11.00K
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC, int_col ASC
+|  order by: tinyint_col ASC NULLS LAST, id ASC, int_col ASC
 |  row-size=22B cardinality=11.00K
 |
 02:ANALYTIC
@@ -2031,7 +2031,7 @@ PLAN-ROOT SINK
 |  row-size=22B cardinality=11.00K
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id DESC NULLS FIRST
+|  order by: tinyint_col ASC NULLS LAST, id DESC NULLS FIRST
 |  row-size=18B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
@@ -2176,7 +2176,7 @@ PLAN-ROOT SINK
 |  row-size=20B cardinality=3.65K
 |
 01:SORT
-|  order by: year ASC NULLS FIRST, id ASC
+|  order by: year ASC NULLS LAST, id ASC
 |  row-size=12B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -2209,7 +2209,7 @@ PLAN-ROOT SINK
 |  row-size=42B cardinality=3.65K
 |
 07:SORT
-|  order by: year ASC NULLS FIRST, tinyint_col ASC NULLS FIRST
+|  order by: year ASC NULLS LAST, tinyint_col ASC NULLS LAST
 |  row-size=34B cardinality=3.65K
 |
 06:ANALYTIC
@@ -2220,7 +2220,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=3.65K
 |
 05:SORT
-|  order by: int_col ASC NULLS FIRST, year ASC NULLS FIRST, id ASC
+|  order by: int_col ASC NULLS LAST, year ASC NULLS LAST, id ASC
 |  row-size=30B cardinality=3.65K
 |
 04:ANALYTIC
@@ -2231,7 +2231,7 @@ PLAN-ROOT SINK
 |  row-size=30B cardinality=3.65K
 |
 03:SORT
-|  order by: tinyint_col ASC NULLS FIRST, id ASC NULLS FIRST, year ASC NULLS FIRST, bigint_col ASC
+|  order by: tinyint_col ASC NULLS LAST, id ASC NULLS LAST, year ASC NULLS LAST, bigint_col ASC
 |  row-size=22B cardinality=3.65K
 |
 02:ANALYTIC
@@ -2242,7 +2242,7 @@ PLAN-ROOT SINK
 |  row-size=22B cardinality=3.65K
 |
 01:SORT
-|  order by: id ASC NULLS FIRST, year ASC NULLS FIRST, int_col ASC
+|  order by: id ASC NULLS LAST, year ASC NULLS LAST, int_col ASC
 |  row-size=21B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -2285,7 +2285,7 @@ PLAN-ROOT SINK
 |  row-size=33B cardinality=7.30K
 |
 03:SORT
-|  order by: year ASC NULLS FIRST, bigint_col ASC
+|  order by: year ASC NULLS LAST, bigint_col ASC
 |  row-size=25B cardinality=7.30K
 |
 02:ANALYTIC
@@ -2296,7 +2296,7 @@ PLAN-ROOT SINK
 |  row-size=25B cardinality=7.30K
 |
 01:SORT
-|  order by: year ASC NULLS FIRST, tinyint_col ASC NULLS FIRST, bigint_col ASC
+|  order by: year ASC NULLS LAST, tinyint_col ASC NULLS LAST, bigint_col ASC
 |  row-size=17B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -2325,7 +2325,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=1
 |
 01:SORT
-|  order by: month ASC NULLS FIRST, tinyint_col ASC NULLS FIRST, bigint_col ASC
+|  order by: month ASC NULLS LAST, tinyint_col ASC NULLS LAST, bigint_col ASC
 |  row-size=21B cardinality=1
 |
 00:SCAN HDFS [functional.alltypestiny]
@@ -2360,7 +2360,7 @@ PLAN-ROOT SINK
 |  row-size=29B cardinality=7.81K
 |
 03:SORT
-|  order by: tinyint_col + 1 ASC NULLS FIRST, int_col - 1 ASC NULLS FIRST, bigint_col ASC
+|  order by: tinyint_col + 1 ASC NULLS LAST, int_col - 1 ASC NULLS LAST, bigint_col ASC
 |  row-size=21B cardinality=7.81K
 |
 02:HASH JOIN [INNER JOIN]
@@ -2402,7 +2402,7 @@ PLAN-ROOT SINK
 |  row-size=24B cardinality=8
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, bigint_col ASC
+|  order by: int_col ASC NULLS LAST, bigint_col ASC
 |  row-size=16B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny]
@@ -2458,7 +2458,7 @@ PLAN-ROOT SINK
 |  row-size=34B cardinality=8
 |
 03:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=26B cardinality=8
 |
 02:HASH JOIN [RIGHT OUTER JOIN]
@@ -2523,7 +2523,7 @@ PLAN-ROOT SINK
 |  row-size=30B cardinality=8
 |
 03:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=22B cardinality=8
 |
 02:HASH JOIN [RIGHT OUTER JOIN]
@@ -2621,7 +2621,7 @@ PLAN-ROOT SINK
 |  row-size=50B cardinality=8
 |
 05:SORT
-|  order by: id ASC NULLS FIRST
+|  order by: id ASC NULLS LAST
 |  row-size=42B cardinality=8
 |
 04:HASH JOIN [RIGHT OUTER JOIN]
@@ -2638,7 +2638,7 @@ PLAN-ROOT SINK
 |  row-size=37B cardinality=8
 |
 02:SORT
-|  order by: id ASC NULLS FIRST
+|  order by: id ASC NULLS LAST
 |  row-size=29B cardinality=8
 |
 01:SCAN HDFS [functional.alltypestiny b]
@@ -2745,7 +2745,7 @@ PLAN-ROOT SINK
 |  row-size=16B cardinality=100
 |
 03:SORT
-|  order by: id ASC NULLS FIRST
+|  order by: id ASC NULLS LAST
 |  row-size=8B cardinality=100
 |
 07:EXCHANGE [HASH(t1.id)]
@@ -2815,7 +2815,7 @@ PLAN-ROOT SINK
 |  row-size=93B cardinality=100
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=89B cardinality=100
 |
 03:EXCHANGE [HASH(bool_col)]
@@ -2839,7 +2839,7 @@ PLAN-ROOT SINK
 |  row-size=93B cardinality=100
 |
 01:SORT
-|  order by: bool_col ASC NULLS FIRST
+|  order by: bool_col ASC NULLS LAST
 |  row-size=89B cardinality=100
 |
 03:EXCHANGE [HASH(bool_col)]
@@ -2865,7 +2865,7 @@ PLAN-ROOT SINK
 |  row-size=33B cardinality=8
 |
 01:SORT
-|  order by: abs(int_col) ASC NULLS FIRST, string_col ASC NULLS FIRST, id ASC
+|  order by: abs(int_col) ASC NULLS LAST, string_col ASC NULLS LAST, id ASC
 |  row-size=29B cardinality=8
 |
 03:EXCHANGE [HASH(abs(int_col),string_col)]
@@ -2941,7 +2941,7 @@ PLAN-ROOT SINK
 |  row-size=33B cardinality=100
 |
 02:SORT
-|  order by: int_col ASC NULLS FIRST, count(bigint_col) ASC
+|  order by: int_col ASC NULLS LAST, count(bigint_col) ASC
 |  row-size=25B cardinality=100
 |
 05:AGGREGATE [FINALIZE]
@@ -2980,7 +2980,7 @@ PLAN-ROOT SINK
 |  row-size=33B cardinality=100
 |
 03:SORT
-|  order by: int_col ASC NULLS FIRST, count(bigint_col) ASC
+|  order by: int_col ASC NULLS LAST, count(bigint_col) ASC
 |  row-size=25B cardinality=100
 |
 09:EXCHANGE [HASH(int_col)]
@@ -3042,7 +3042,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer t]
-   HDFS partitions=1/1 files=4 size=288.99MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=20B cardinality=150.00K
 ====
 # IMPALA-8718: No more collection slots in the output of the inline view of analytics.
@@ -3083,10 +3083,10 @@ PLAN-ROOT SINK
 |  |     row-size=0B cardinality=10
 |  |
 |  01:SCAN HDFS [tpch_nested_parquet.customer t]
-|     HDFS partitions=1/1 files=4 size=288.99MB
+|     HDFS partitions=1/1 files=4 size=289.02MB
 |     row-size=20B cardinality=150.00K
 |
 00:SCAN HDFS [tpch_nested_parquet.customer leftside]
-   HDFS partitions=1/1 files=4 size=288.99MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=38B cardinality=150.00K
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 6eeb8c0..c942508 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -369,7 +369,7 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC
+|  order by: concat('ab', string_col) ASC NULLS LAST, greatest(20, bigint_col) ASC
 |  materialized: concat('ab', string_col), greatest(20, bigint_col)
 |  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=45B cardinality=7.30K
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/convert-to-cnf.test b/testdata/workloads/functional-planner/queries/PlannerTest/convert-to-cnf.test
index f643507..2a10421 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/convert-to-cnf.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/convert-to-cnf.test
@@ -365,7 +365,7 @@ PLAN-ROOT SINK
 |  limit: 5
 |  row-size=28B cardinality=5
 |
-01:SORT
+01:TOP-N [LIMIT=5]
 |  order by: if(l_quantity < 5 OR l_quantity > 45, 'invalid', 'valid') ASC
 |  row-size=20B cardinality=6.00M
 |
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
index 60e0b78..25f7ea7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
@@ -1761,7 +1761,7 @@ PLAN-ROOT SINK
 |  |  row-size=16B cardinality=8
 |  |
 |  02:SORT
-|  |  order by: bigint_col ASC NULLS FIRST
+|  |  order by: bigint_col ASC NULLS LAST
 |  |  row-size=12B cardinality=8
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index 19483fe..3ba3674 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -10,7 +10,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 5
-   partitions=1/24 files=1 size=20.36KB
+   HDFS partitions=1/24 files=1 size=20.36KB
    row-size=81B cardinality=310
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -21,7 +21,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 5
-   partitions=1/24 files=1 size=20.36KB
+   HDFS partitions=1/24 files=1 size=20.36KB
    row-size=81B cardinality=310
 ====
 # insert into a static partition
@@ -37,7 +37,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,4
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 5
-   partitions=1/24 files=1 size=20.36KB
+   HDFS partitions=1/24 files=1 size=20.36KB
    row-size=81B cardinality=310
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -48,7 +48,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,4
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 5
-   partitions=1/24 files=1 size=20.36KB
+   HDFS partitions=1/24 files=1 size=20.36KB
    row-size=81B cardinality=310
 ====
 # overwrite a static partition
@@ -64,7 +64,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=true, PARTITION-KEYS=(2009,4)
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 5
-   partitions=1/24 files=1 size=20.36KB
+   HDFS partitions=1/24 files=1 size=20.36KB
    row-size=81B cardinality=310
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -75,7 +75,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=true, PARTITION-KEYS=(2009,4)
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 5
-   partitions=1/24 files=1 size=20.36KB
+   HDFS partitions=1/24 files=1 size=20.36KB
    row-size=81B cardinality=310
 ====
 # insert into fully dynamic partitions
@@ -95,7 +95,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,m
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=89B cardinality=610
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -113,7 +113,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,m
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=89B cardinality=610
 ====
 # IMPALA-5293: noclustered hint prevents adding sort node
@@ -129,7 +129,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(`year`
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=89B cardinality=610
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -143,7 +143,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(`year`
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=89B cardinality=610
 ====
 # insert into fully dynamic partitions. The source table has no stats and the insert
@@ -160,26 +160,26 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(int_co
 |
 01:SORT
 |  order by: int_col ASC NULLS LAST, int_col ASC NULLS LAST
-|  row-size=72B cardinality=unavailable
+|  row-size=72B cardinality=520
 |
 00:SCAN HDFS [functional_seq_snap.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=11.34KB
-   row-size=72B cardinality=unavailable
+   HDFS partitions=2/24 files=2 size=11.34KB
+   row-size=72B cardinality=520
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(int_col,int_col)]
 |  partitions=unavailable
 |
 02:SORT
 |  order by: int_col ASC NULLS LAST, int_col ASC NULLS LAST
-|  row-size=72B cardinality=unavailable
+|  row-size=72B cardinality=520
 |
 01:EXCHANGE [HASH(int_col,int_col)]
 |
 00:SCAN HDFS [functional_seq_snap.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=11.34KB
-   row-size=72B cardinality=unavailable
+   HDFS partitions=2/24 files=2 size=11.34KB
+   row-size=72B cardinality=520
 ====
 # insert into fully dynamic partitions;
 # partitioned output doesn't require repartitioning
@@ -206,7 +206,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(`year`
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=89B cardinality=610
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -234,7 +234,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(`year`
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=89B cardinality=610
 ====
 # insert into a partially dynamic partition
@@ -254,7 +254,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,m
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=85B cardinality=610
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -272,7 +272,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,m
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=85B cardinality=610
 ====
 # insert into a partially dynamic partition
@@ -300,7 +300,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,`
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=85B cardinality=610
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -328,7 +328,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,`
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` > 10
-   partitions=2/24 files=2 size=40.07KB
+   HDFS partitions=2/24 files=2 size=40.07KB
    row-size=85B cardinality=610
 ====
 # insert into a partially dynamic partition
@@ -348,7 +348,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,4
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` > 2009, `month` = 4
-   partitions=1/24 files=1 size=19.71KB
+   HDFS partitions=1/24 files=1 size=19.71KB
    row-size=85B cardinality=300
 ---- SCANRANGELOCATIONS
 NODE 0:
@@ -363,7 +363,7 @@ WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,4
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` > 2009, `month` = 4
-   partitions=1/24 files=1 size=19.71KB
+   HDFS partitions=1/24 files=1 size=19.71KB
    row-size=85B cardinality=300
 ====
 # insert with limit from partitioned table.
@@ -377,7 +377,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 1
-   partitions=1/24 files=1 size=19.95KB
+   HDFS partitions=1/24 files=1 size=19.95KB
    limit: 10
    row-size=81B cardinality=10
 ---- SCANRANGELOCATIONS
@@ -392,7 +392,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |
 00:SCAN HDFS [functional.alltypes]
    partition predicates: `year` = 2009, `month` = 1
-   partitions=1/24 files=1 size=19.95KB
+   HDFS partitions=1/24 files=1 size=19.95KB
    limit: 10
    row-size=81B cardinality=10
 ====
@@ -502,7 +502,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 |  row-size=21B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=17B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
@@ -521,7 +521,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 |  row-size=21B cardinality=10
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=17B cardinality=7.30K
 ====
 # test static partition insert from a query with distinct grouped aggregation
@@ -544,7 +544,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 |  row-size=17B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=17B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
@@ -573,7 +573,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 |  row-size=17B cardinality=100
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=17B cardinality=7.30K
 ====
 # test that the planner chooses to repartition before the table sink
@@ -592,7 +592,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ====
 # test noshuffle hint to prevent repartitioning (same query as above with hint)
@@ -607,7 +607,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ====
 # same as above but with traditional commented hint at default hint location
@@ -622,7 +622,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ====
 # same as above but with traditional commented hint at Oracle hint location
@@ -637,7 +637,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ====
 # same as above but with enf-of-line commented hint
@@ -653,7 +653,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ====
 # test that the planner does not repartition before the table sink
@@ -672,7 +672,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,1)]
 |  row-size=85B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=85B cardinality=7.30K
 ====
 # test shuffle hint to force repartitioning (same query as above with hint)
@@ -691,7 +691,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,1)]
 01:EXCHANGE [HASH(`year`)]
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=85B cardinality=7.30K
 ====
 # test insert/select stmt that contains an analytic function (IMPALA-1400)
@@ -712,11 +712,11 @@ WRITE TO HDFS [functional.alltypestiny, OVERWRITE=false, PARTITION-KEYS=(2009,1)
 |  row-size=97B cardinality=8
 |
 01:SORT
-|  order by: id ASC NULLS FIRST
+|  order by: id ASC NULLS LAST
 |  row-size=81B cardinality=8
 |
 00:SCAN HDFS [functional.alltypestiny]
-   partitions=4/4 files=4 size=460B
+   HDFS partitions=4/4 files=4 size=460B
    row-size=81B cardinality=8
 ====
 # IMPALA-3930: Test insert with shuffle hint on constant partition exprs. The table sink
@@ -732,7 +732,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2009,1)]
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=81B cardinality=7.30K
 ====
 # IMPALA-3930: Same as above but with a dynamic partition insert.
@@ -747,7 +747,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2009,1)]
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=81B cardinality=7.30K
 ====
 # IMPALA-3930: Same as above but with a mix of static/dynamic partition exprs, and
@@ -763,7 +763,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2009,5)]
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=81B cardinality=7.30K
 ====
 # Test insert into an unpartitioned table with shuffle hint.
@@ -778,7 +778,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=81B cardinality=7.30K
 ====
 # IMPALA-5293: ensure insert into partitioned table adds sort node without clustered hint.
@@ -793,7 +793,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
@@ -806,7 +806,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ====
 # IMPALA-5293: ensure insert into partitioned table adds sort node without clustered hint.
@@ -821,7 +821,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
@@ -832,7 +832,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  row-size=89B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    row-size=89B cardinality=7.30K
 ====
 # IMPALA-5293: ensure insert into partitioned table adds sort node without clustered hint.
@@ -858,11 +858,11 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
-|     partitions=24/24 files=24 size=478.45KB
+|     HDFS partitions=24/24 files=24 size=478.45KB
 |     row-size=4B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> int_col
    row-size=89B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
@@ -893,11 +893,11 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  |  row-size=4B cardinality=1
 |  |
 |  01:SCAN HDFS [functional.alltypes]
-|     partitions=24/24 files=24 size=478.45KB
+|     HDFS partitions=24/24 files=24 size=478.45KB
 |     row-size=4B cardinality=7.30K
 |
 00:SCAN HDFS [functional.alltypes]
-   partitions=24/24 files=24 size=478.45KB
+   HDFS partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> int_col
    row-size=89B cardinality=7.30K
 ====
@@ -909,14 +909,14 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |  partitions=1
 |
 00:SCAN HDFS [functional.alltypesnopart]
-   partitions=1/1 files=0 size=0B
+   HDFS partitions=1/1 files=0 size=0B
    row-size=72B cardinality=0
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |  partitions=1
 |
 00:SCAN HDFS [functional.alltypesnopart]
-   partitions=1/1 files=0 size=0B
+   HDFS partitions=1/1 files=0 size=0B
    row-size=72B cardinality=0
 ====
 # IMPALA-5293: ensure insert into non-partitioned table does not add sort node.
@@ -927,7 +927,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 |  partitions=1
 |
 00:SCAN HDFS [functional.alltypesnopart]
-   partitions=1/1 files=0 size=0B
+   HDFS partitions=1/1 files=0 size=0B
    row-size=72B cardinality=0
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
@@ -936,6 +936,6 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional.alltypesnopart]
-   partitions=1/1 files=0 size=0B
+   HDFS partitions=1/1 files=0 size=0B
    row-size=72B cardinality=0
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/limit-pushdown-analytic.test b/testdata/workloads/functional-planner/queries/PlannerTest/limit-pushdown-analytic.test
new file mode 100644
index 0000000..210615a
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/limit-pushdown-analytic.test
@@ -0,0 +1,782 @@
+# IMPALA-9983
+# Base case. Limit pushdown into analytic sort should be applied
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+order by int_col, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  limit: 10
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+06:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+05:EXCHANGE [HASH(int_col)]
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# row_number() predicate on equality instead of range.
+# Limit pushdown into analytic sort should be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    row_number() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk = 5
+order by int_col, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=1
+|
+03:SELECT
+|  predicates: row_number() = 5
+|  row-size=22B cardinality=1
+|
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  limit: 10
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=1
+|
+03:SELECT
+|  predicates: row_number() = 5
+|  row-size=22B cardinality=1
+|
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+06:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+05:EXCHANGE [HASH(int_col)]
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Multi column partition-by which is prefix of sort exprs.
+# Limit pushdown into analytic sort should be applied
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col, bigint_col
+                 order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+order by int_col, bigint_col, smallint_col, rk
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col, bigint_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, bigint_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  limit: 10
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col, bigint_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+06:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, bigint_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+05:EXCHANGE [HASH(int_col,bigint_col)]
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, bigint_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# No predicate after the analytic function.
+# Limit pushdown should be applied
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col, bigint_col
+                 order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+order by int_col, bigint_col, smallint_col, rk
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col, bigint_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, bigint_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  limit: 10
+|
+03:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col, bigint_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+05:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, bigint_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+04:EXCHANGE [HASH(int_col,bigint_col)]
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, bigint_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Limit is present without Order By.
+# Limit pushdown should be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  limit: 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:EXCHANGE [UNPARTITIONED]
+|  limit: 10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  limit: 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+05:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+04:EXCHANGE [HASH(int_col,smallint_col)]
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# rank() predicate operands is flipped.
+# Limit pushdown should be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where 10 > rk
+order by int_col, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() < 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  limit: 10
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() < 10
+|  row-size=22B cardinality=10
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+06:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+05:EXCHANGE [HASH(int_col)]
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Partition-by and order-by exprs refer to derived table
+# columns rather than base table columns.
+# Limit pushdown should be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from (select int_col, bigint_col, smallint_col from functional.alltypesagg
+        group by int_col, bigint_col, smallint_col)dt1)dt2
+where rk <= 10
+order by int_col, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+05:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+04:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=10
+|
+03:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=10
+|
+02:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=10
+|
+01:AGGREGATE [FINALIZE]
+|  group by: int_col, bigint_col, smallint_col
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# No limit in the query.
+# Limit pushdown should not be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+order by int_col, bigint_col, smallint_col, rk
+---- PLAN
+PLAN-ROOT SINK
+|
+04:SORT
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=1.10K
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=1.10K
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Rank predicate's upper bound is greater than limit.
+# Limit pushdown should not be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 20
+order by int_col, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 20
+|  row-size=22B cardinality=1.10K
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Function is not a ranking function.
+# Limit pushdown should not be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    min(double_col) over (partition by int_col
+                          order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+order by int_col, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: min(double_col) <= 10
+|  row-size=30B cardinality=1.10K
+|
+02:ANALYTIC
+|  functions: min(double_col)
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=30B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=22B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=22B cardinality=11.00K
+====
+# Partition-by expr is not a prefix of the sort exprs.
+# Limit pushdown should not be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+order by bigint_col, int_col, smallint_col, rk
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: bigint_col ASC, int_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=1.10K
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Blocking operator occurs between the top level TopN
+# operator and the analytic operator.
+# Limit pushdown should not be applied.
+select int_col, bigint_col, smallint_col, rk from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+group by int_col, bigint_col, smallint_col, rk
+order by int_col, bigint_col, smallint_col, rk
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+05:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+04:AGGREGATE [FINALIZE]
+|  group by: int_col, bigint_col, smallint_col, rank()
+|  row-size=22B cardinality=1.10K
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=1.10K
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Rank predicate is not one of <, =, <= .
+# Limit pushdown should not be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk in (10, 20, 30)
+order by int_col, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() IN (10, 20, 30)
+|  row-size=22B cardinality=11.00K
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# 2 ranking functions with different partition-by exprs. Predicate is on
+# the dense_rank() function. If the dense_rank() appears as the top
+# level analytic function, the limit pushdown can be applied. Otherwise,
+# it should not.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    dense_rank() over (partition by bigint_col order by smallint_col desc) rk2,
+    rank() over (partition by int_col order by smallint_col desc) rk1
+  from functional.alltypesagg) dt
+where rk2 <= 10
+order by int_col, bigint_col, smallint_col, rk1, rk2
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+06:TOP-N [LIMIT=10]
+|  order by: int_col ASC, bigint_col ASC, smallint_col ASC, rk1 ASC, rk2 ASC
+|  row-size=30B cardinality=10
+|
+05:SELECT
+|  predicates: dense_rank() <= 10
+|  row-size=30B cardinality=1.10K
+|
+04:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=30B cardinality=11.00K
+|
+03:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=22B cardinality=11.00K
+|
+02:ANALYTIC
+|  functions: dense_rank()
+|  partition by: bigint_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: bigint_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# Asc/Desc direction is different for the top order-by.
+# Limit pushdown should not be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+order by int_col DESC, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col DESC, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=1.10K
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
+# NULLS FIRST for the top order-by which is different compared
+# to the partition-by expr.
+# Limit pushdown should not be applied.
+select * from (
+  select int_col, bigint_col, smallint_col,
+    rank() over (partition by int_col order by smallint_col desc) rk
+  from functional.alltypesagg) dt
+where rk <= 10
+order by int_col NULLS FIRST, bigint_col, smallint_col, rk
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+04:TOP-N [LIMIT=10]
+|  order by: int_col ASC NULLS FIRST, bigint_col ASC, smallint_col ASC, rk ASC
+|  row-size=22B cardinality=10
+|
+03:SELECT
+|  predicates: rank() <= 10
+|  row-size=22B cardinality=1.10K
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: smallint_col DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=22B cardinality=11.00K
+|
+01:SORT
+|  order by: int_col ASC NULLS LAST, smallint_col DESC
+|  row-size=14B cardinality=11.00K
+|
+00:SCAN HDFS [functional.alltypesagg]
+   HDFS partitions=11/11 files=11 size=814.73KB
+   row-size=14B cardinality=11.00K
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
index d00afb8..ab405df 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
@@ -107,9 +107,9 @@ Per-Host Resources: mem-estimate=359.29MB mem-reservation=86.00MB thread-reserva
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   HDFS partitions=1/1 files=3 size=194.00MB
+   HDFS partitions=1/1 files=3 size=193.98MB
    stored statistics:
-     table: rows=6.00M size=194.00MB
+     table: rows=6.00M size=193.98MB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
@@ -164,9 +164,9 @@ Per-Host Resources: mem-estimate=124.02MB mem-reservation=74.00MB thread-reserva
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   HDFS partitions=1/1 files=3 size=194.00MB
+   HDFS partitions=1/1 files=3 size=193.98MB
    stored statistics:
-     table: rows=6.00M size=194.00MB
+     table: rows=6.00M size=193.98MB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
@@ -256,10 +256,10 @@ Per-Host Resources: mem-estimate=99.65MB mem-reservation=66.00MB thread-reservat
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=81.00MB mem-reservation=5.00MB thread-reservation=2 runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   HDFS partitions=1/1 files=3 size=194.00MB
+   HDFS partitions=1/1 files=3 size=193.98MB
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6.00M size=194.00MB
+     table: rows=6.00M size=193.98MB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
@@ -308,9 +308,9 @@ Per-Host Resources: mem-estimate=806.43MB mem-reservation=74.00MB thread-reserva
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   HDFS partitions=1/1 files=3 size=194.00MB
+   HDFS partitions=1/1 files=3 size=193.98MB
    stored statistics:
-     table: rows=6.00M size=194.00MB
+     table: rows=6.00M size=193.98MB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
@@ -363,9 +363,9 @@ Per-Host Resources: mem-estimate=168.14MB mem-reservation=50.00MB thread-reserva
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   HDFS partitions=1/1 files=3 size=194.00MB
+   HDFS partitions=1/1 files=3 size=193.98MB
    stored statistics:
-     table: rows=6.00M size=194.00MB
+     table: rows=6.00M size=193.98MB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=16.00MB thread-reservation=1
@@ -404,7 +404,7 @@ Per-Host Resources: mem-estimate=40.04MB mem-reservation=40.00MB thread-reservat
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  mem-estimate=24.00MB mem-reservation=24.00MB spill-buffer=8.00MB thread-reservation=0
 |  tuple-ids=3 row-size=5B cardinality=7.30K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 03a9248..6f7695a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -270,7 +270,7 @@ WRITE TO HDFS [default.ctas_mt_dop_test, OVERWRITE=false]
 |  mem-estimate=100.00KB mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=201.93KB
+   HDFS partitions=24/24 files=24 size=203.33KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -309,7 +309,7 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=201.93KB
+   HDFS partitions=24/24 files=24 size=203.33KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -365,7 +365,7 @@ Per-Instance Resources: mem-estimate=144.00MB mem-reservation=34.02MB thread-res
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=201.93KB
+   HDFS partitions=24/24 files=24 size=203.33KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -399,13 +399,13 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, id ASC
+|  order by: int_col ASC NULLS LAST, id ASC
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=4 row-size=8B cardinality=unavailable
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=201.93KB
+   HDFS partitions=24/24 files=24 size=203.33KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -441,7 +441,7 @@ Per-Instance Resources: mem-estimate=10.11MB mem-reservation=10.00MB thread-rese
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST, id ASC
+|  order by: int_col ASC NULLS LAST, id ASC
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=4 row-size=8B cardinality=unavailable
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -454,7 +454,7 @@ Per-Instance Resources: mem-estimate=10.11MB mem-reservation=10.00MB thread-rese
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 Per-Instance Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=201.93KB
+   HDFS partitions=24/24 files=24 size=203.33KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -523,14 +523,14 @@ PLAN-ROOT SINK
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.17MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
    predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
    stored statistics:
-     table: rows=150.00K size=289.17MB
+     table: rows=150.00K size=289.02MB
      columns missing stats: c_orders
-   extrapolated-rows=disabled max-scan-range-rows=50.11K
+   extrapolated-rows=disabled max-scan-range-rows=50.42K
    parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
    parquet statistics predicates on o: o_orderkey < CAST(5 AS BIGINT)
    parquet statistics predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
@@ -599,14 +599,14 @@ Per-Instance Resources: mem-estimate=104.00MB mem-reservation=104.00MB thread-re
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
-   HDFS partitions=1/1 files=4 size=289.17MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
    predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
    stored statistics:
-     table: rows=150.00K size=289.17MB
+     table: rows=150.00K size=289.02MB
      columns missing stats: c_orders
-   extrapolated-rows=disabled max-scan-range-rows=50.11K
+   extrapolated-rows=disabled max-scan-range-rows=50.42K
    parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
    parquet statistics predicates on o: o_orderkey < CAST(5 AS BIGINT)
    parquet statistics predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
@@ -664,13 +664,13 @@ PLAN-ROOT SINK
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.17MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    stored statistics:
-     table: rows=150.00K size=289.17MB
+     table: rows=150.00K size=289.02MB
      columns missing stats: c_orders, c_orders
-   extrapolated-rows=disabled max-scan-range-rows=50.11K
+   extrapolated-rows=disabled max-scan-range-rows=50.42K
    parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    parquet dictionary predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    mem-estimate=88.00MB mem-reservation=16.00MB thread-reservation=0
@@ -726,13 +726,13 @@ Per-Instance Resources: mem-estimate=89.94MB mem-reservation=17.94MB thread-rese
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
-   HDFS partitions=1/1 files=4 size=289.17MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    stored statistics:
-     table: rows=150.00K size=289.17MB
+     table: rows=150.00K size=289.02MB
      columns missing stats: c_orders, c_orders
-   extrapolated-rows=disabled max-scan-range-rows=50.11K
+   extrapolated-rows=disabled max-scan-range-rows=50.42K
    parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    parquet dictionary predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    mem-estimate=88.00MB mem-reservation=16.00MB thread-reservation=0
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
index 78705c6..3cb59d7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
@@ -109,7 +109,7 @@ PLAN-ROOT SINK
 |     row-size=77B cardinality=10.00K
 |
 05:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    runtime filters: RF000 -> c_nationkey, RF001 -> c.c_comment, RF004 -> c.c_nationkey, RF005 -> c_comment
    row-size=87B cardinality=150.00K
 ====
@@ -1657,7 +1657,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates: c_custkey < 10, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < 5
    predicates on o_lineitems: l_linenumber < 3
@@ -1789,7 +1789,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates: c.c_custkey = c.c_nationkey, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o.o_orderkey = o.o_shippriority
    predicates on l: l.l_partkey = l.l_linenumber, l.l_partkey = l.l_suppkey
@@ -1961,7 +1961,7 @@ PLAN-ROOT SINK
 |  |  row-size=20B cardinality=10
 |  |
 |  05:SORT
-|  |  order by: `key` ASC NULLS FIRST
+|  |  order by: `key` ASC NULLS LAST
 |  |  row-size=12B cardinality=10
 |  |
 |  04:AGGREGATE [FINALIZE]
@@ -2077,7 +2077,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=44B cardinality=150.00K
 ====
 # IMPALA-2412: Test join ordering in nested subplans. Same as above
@@ -2136,7 +2136,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates: !empty(c.c_orders)
    row-size=44B cardinality=150.00K
 ====
@@ -2193,7 +2193,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=44B cardinality=150.00K
 ====
 # IMPALA-2446: Test predicate assignment when outer join has no conjuncts in
@@ -2340,7 +2340,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=12B cardinality=150.00K
 ====
 # IMPALA-3065/IMPALA-3062: Test correct assignment of !empty() predicates.
@@ -2358,7 +2358,7 @@ PLAN-ROOT SINK
 |  row-size=28B cardinality=1.50M
 |
 |--05:SCAN HDFS [tpch_nested_parquet.customer c2]
-|     HDFS partitions=1/1 files=4 size=289.08MB
+|     HDFS partitions=1/1 files=4 size=289.02MB
 |     row-size=8B cardinality=150.00K
 |
 01:SUBPLAN
@@ -2374,7 +2374,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c1]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    runtime filters: RF000 -> c1.c_custkey
    row-size=20B cardinality=150.00K
 ====
@@ -2412,11 +2412,11 @@ PLAN-ROOT SINK
 |  row-size=40B cardinality=300.00K
 |
 |--01:SCAN HDFS [tpch_nested_parquet.customer c2]
-|     HDFS partitions=1/1 files=4 size=289.08MB
+|     HDFS partitions=1/1 files=4 size=289.02MB
 |     row-size=20B cardinality=150.00K
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c1]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=20B cardinality=150.00K
 ====
 # IMPALA-3084: Test correct assignment of NULL checking predicates
@@ -2441,7 +2441,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=230B cardinality=150.00K
 ====
 # IMPALA-2540: Complex query mixing joins on base tables and nested collections.
@@ -2500,7 +2500,7 @@ PLAN-ROOT SINK
 |  |  |     row-size=2B cardinality=5
 |  |  |
 |  |  01:SCAN HDFS [tpch_nested_parquet.customer t2]
-|  |     HDFS partitions=1/1 files=4 size=289.08MB
+|  |     HDFS partitions=1/1 files=4 size=289.02MB
 |  |     runtime filters: RF004 -> t2.c_custkey
 |  |     row-size=59B cardinality=150.00K
 |  |
@@ -2543,7 +2543,7 @@ PLAN-ROOT SINK
 |     row-size=8B cardinality=2
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    predicates on c_orders: o_orderkey = 6000000
    row-size=20B cardinality=150.00K
 ====
@@ -2594,7 +2594,7 @@ PLAN-ROOT SINK
 |     row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.08MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=20B cardinality=150.00K
 ====
 # IMPALA-1270: SEMI JOIN in subplan with distinct added by planner.
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 5ac65cf..1ea06fb 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -2993,7 +2993,7 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=5B cardinality=7.30K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -3036,7 +3036,7 @@ Per-Host Resources: mem-estimate=10.04MB mem-reservation=10.00MB thread-reservat
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=5B cardinality=7.30K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -3086,7 +3086,7 @@ Per-Instance Resources: mem-estimate=10.06MB mem-reservation=10.00MB thread-rese
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: int_col ASC NULLS FIRST
+|  order by: int_col ASC NULLS LAST
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=5B cardinality=7.30K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -5859,7 +5859,7 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: tinyint_col ASC NULLS FIRST, smallint_col ASC
+|  order by: tinyint_col ASC NULLS LAST, smallint_col ASC
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=5 row-size=3B cardinality=11.00K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -6091,3 +6091,134 @@ Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=1B cardinality=1
    in pipelines: <none>
 ====
+# IMPALA-9983: Limit pushdown into analytic sort
+select * from (
+  select l_partkey, l_quantity, l_orderkey,
+    rank() over (partition by l_partkey order by l_orderkey desc) rk
+  from tpch_parquet.lineitem) dt
+where rk <= 100
+order by l_partkey, l_quantity, l_orderkey, rk
+limit 100
+---- PLAN
+Max Per-Host Resource Reservation: Memory=20.00MB Threads=2
+Per-Host Resource Estimates: Memory=84MB
+Analyzed query: SELECT * FROM (SELECT l_partkey, l_quantity, l_orderkey, rank()
+OVER (PARTITION BY l_partkey ORDER BY l_orderkey DESC) rk FROM
+tpch_parquet.lineitem) dt WHERE rk <= CAST(100 AS BIGINT) ORDER BY l_partkey
+ASC, l_quantity ASC, l_orderkey ASC, rk ASC LIMIT CAST(100 AS TINYINT)
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=84.00MB mem-reservation=20.00MB thread-reservation=2
+PLAN-ROOT SINK
+|  output exprs: l_partkey, l_quantity, l_orderkey, rk
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+04:TOP-N [LIMIT=100]
+|  order by: l_partkey ASC, l_quantity ASC, l_orderkey ASC, rk ASC
+|  mem-estimate=3.12KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4 row-size=32B cardinality=100
+|  in pipelines: 04(GETNEXT), 01(OPEN)
+|
+03:SELECT
+|  predicates: rank() <= CAST(100 AS BIGINT)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=8,7 row-size=32B cardinality=100
+|  in pipelines: 01(GETNEXT)
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: l_partkey
+|  order by: l_orderkey DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8,7 row-size=32B cardinality=100
+|  in pipelines: 01(GETNEXT)
+|
+01:TOP-N [LIMIT=100]
+|  order by: l_partkey ASC NULLS LAST, l_orderkey DESC
+|  mem-estimate=2.34KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=8 row-size=24B cardinality=100
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.98MB
+   stored statistics:
+     table: rows=6.00M size=193.98MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=16.00MB thread-reservation=1
+   tuple-ids=0 row-size=24B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=20.00MB Threads=4
+Per-Host Resource Estimates: Memory=84MB
+Analyzed query: SELECT * FROM (SELECT l_partkey, l_quantity, l_orderkey, rank()
+OVER (PARTITION BY l_partkey ORDER BY l_orderkey DESC) rk FROM
+tpch_parquet.lineitem) dt WHERE rk <= CAST(100 AS BIGINT) ORDER BY l_partkey
+ASC, l_quantity ASC, l_orderkey ASC, rk ASC LIMIT CAST(100 AS TINYINT)
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: l_partkey, l_quantity, l_orderkey, rk
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: l_partkey ASC, l_quantity ASC, l_orderkey ASC, rk ASC
+|  limit: 100
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4 row-size=32B cardinality=100
+|  in pipelines: 04(GETNEXT)
+|
+F01:PLAN FRAGMENT [HASH(l_partkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+04:TOP-N [LIMIT=100]
+|  order by: l_partkey ASC, l_quantity ASC, l_orderkey ASC, rk ASC
+|  mem-estimate=3.12KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4 row-size=32B cardinality=100
+|  in pipelines: 04(GETNEXT), 06(OPEN)
+|
+03:SELECT
+|  predicates: rank() <= CAST(100 AS BIGINT)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=8,7 row-size=32B cardinality=100
+|  in pipelines: 06(GETNEXT)
+|
+02:ANALYTIC
+|  functions: rank()
+|  partition by: l_partkey
+|  order by: l_orderkey DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8,7 row-size=32B cardinality=100
+|  in pipelines: 06(GETNEXT)
+|
+06:TOP-N [LIMIT=100]
+|  order by: l_partkey ASC NULLS LAST, l_orderkey DESC
+|  mem-estimate=2.34KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=8 row-size=24B cardinality=100
+|  in pipelines: 06(GETNEXT), 01(OPEN)
+|
+05:EXCHANGE [HASH(l_partkey)]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=8 row-size=24B cardinality=100
+|  in pipelines: 01(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=80.00MB mem-reservation=16.00MB thread-reservation=2
+01:TOP-N [LIMIT=100]
+|  order by: l_partkey ASC NULLS LAST, l_orderkey DESC
+|  mem-estimate=2.34KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=8 row-size=24B cardinality=100
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=193.98MB
+   stored statistics:
+     table: rows=6.00M size=193.98MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=16.00MB thread-reservation=1
+   tuple-ids=0 row-size=24B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index 8943cd8..6292d80 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -1058,7 +1058,7 @@ PLAN-ROOT SINK
 |  row-size=20B cardinality=11.00K
 |
 03:SORT
-|  order by: year ASC NULLS FIRST, month DESC
+|  order by: year ASC NULLS LAST, month DESC
 |  row-size=12B cardinality=11.00K
 |
 02:HASH JOIN [INNER JOIN]
@@ -1102,7 +1102,7 @@ PLAN-ROOT SINK
 |  row-size=20B cardinality=3.65K
 |
 01:SORT
-|  order by: year ASC NULLS FIRST, id ASC
+|  order by: year ASC NULLS LAST, id ASC
 |  row-size=12B cardinality=3.65K
 |
 00:SCAN HDFS [functional.alltypes]
@@ -1266,7 +1266,7 @@ PLAN-ROOT SINK
 |     row-size=32B cardinality=1
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=289.05MB
+   HDFS partitions=1/1 files=4 size=289.02MB
    row-size=32B cardinality=150.00K
 ====
 # Two-way join query where the build side is optimized into an empty set
@@ -1709,7 +1709,7 @@ PLAN-ROOT SINK
 |  row-size=419B cardinality=4
 |
 |--00:SCAN HDFS [tpch_parquet.lineitem]
-|     HDFS partitions=1/1 files=3 size=193.99MB
+|     HDFS partitions=1/1 files=3 size=193.98MB
 |     predicates: l_orderkey = 965
 |     row-size=231B cardinality=4
 |
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/semi-join-distinct.test b/testdata/workloads/functional-planner/queries/PlannerTest/semi-join-distinct.test
index 1e15873..f4f4137 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/semi-join-distinct.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/semi-join-distinct.test
@@ -76,9 +76,9 @@ PLAN-ROOT SINK
 |     row-size=89B cardinality=8
 |
 01:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=202.01KB
+   HDFS partitions=24/24 files=24 size=203.33KB
    runtime filters: RF000 -> int_col
-   row-size=4B cardinality=12.85K
+   row-size=4B cardinality=12.94K
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
@@ -102,9 +102,9 @@ PLAN-ROOT SINK
 03:EXCHANGE [HASH(int_col)]
 |
 01:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=202.01KB
+   HDFS partitions=24/24 files=24 size=203.33KB
    runtime filters: RF000 -> int_col
-   row-size=4B cardinality=12.85K
+   row-size=4B cardinality=12.94K
 ====
 # IMPALA-1270: distinct should be added to subquery automatically because
 # it would reduce cardinality significantly.
@@ -912,7 +912,7 @@ PLAN-ROOT SINK
 |  |  row-size=16B cardinality=7.30K
 |  |
 |  02:SORT
-|  |  order by: int_col ASC NULLS FIRST, id ASC
+|  |  order by: int_col ASC NULLS LAST, id ASC
 |  |  row-size=8B cardinality=7.30K
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
@@ -957,7 +957,7 @@ PLAN-ROOT SINK
 |  |  row-size=16B cardinality=7.30K
 |  |
 |  02:SORT
-|  |  order by: int_col ASC NULLS FIRST, id ASC
+|  |  order by: int_col ASC NULLS LAST, id ASC
 |  |  row-size=8B cardinality=7.30K
 |  |
 |  06:EXCHANGE [HASH(int_col)]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
index afbd646..005b709 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
@@ -253,7 +253,7 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: default.testfn(double_col) ASC NULLS FIRST, random() ASC
+|  order by: default.testfn(double_col) ASC NULLS LAST, random() ASC
 |  materialized: default.testfn(double_col), random()
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6 row-size=24B cardinality=8
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
index 63f0b8f..b5d9a20 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
@@ -3543,7 +3543,7 @@ PLAN-ROOT SINK
 |  row-size=40B cardinality=6
 |
 08:SORT
-|  order by: i_manager_id ASC NULLS FIRST
+|  order by: i_manager_id ASC NULLS LAST
 |  row-size=24B cardinality=6
 |
 07:AGGREGATE [FINALIZE]
@@ -3608,7 +3608,7 @@ PLAN-ROOT SINK
 |  row-size=40B cardinality=6
 |
 08:SORT
-|  order by: i_manager_id ASC NULLS FIRST
+|  order by: i_manager_id ASC NULLS LAST
 |  row-size=24B cardinality=6
 |
 16:AGGREGATE [FINALIZE]
@@ -3686,7 +3686,7 @@ PLAN-ROOT SINK
 |  row-size=40B cardinality=6
 |
 08:SORT
-|  order by: i_manager_id ASC NULLS FIRST
+|  order by: i_manager_id ASC NULLS LAST
 |  row-size=24B cardinality=6
 |
 16:AGGREGATE [FINALIZE]
@@ -5077,7 +5077,7 @@ PLAN-ROOT SINK
 |  row-size=137B cardinality=10.67K
 |
 08:SORT
-|  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST
+|  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST
 |  row-size=121B cardinality=10.67K
 |
 07:AGGREGATE [FINALIZE]
@@ -5141,7 +5141,7 @@ PLAN-ROOT SINK
 |  row-size=137B cardinality=10.67K
 |
 08:SORT
-|  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST
+|  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST
 |  row-size=121B cardinality=10.67K
 |
 16:AGGREGATE [FINALIZE]
@@ -5218,7 +5218,7 @@ PLAN-ROOT SINK
 |  row-size=137B cardinality=10.67K
 |
 08:SORT
-|  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST
+|  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST
 |  row-size=121B cardinality=10.67K
 |
 16:AGGREGATE [FINALIZE]
@@ -5512,7 +5512,7 @@ PLAN-ROOT SINK
 |  row-size=214B cardinality=29.80K
 |
 06:SORT
-|  order by: i_class ASC NULLS FIRST
+|  order by: i_class ASC NULLS LAST
 |  row-size=198B cardinality=29.80K
 |
 05:AGGREGATE [FINALIZE]
@@ -5565,7 +5565,7 @@ PLAN-ROOT SINK
 |  row-size=214B cardinality=29.80K
 |
 06:SORT
-|  order by: i_class ASC NULLS FIRST
+|  order by: i_class ASC NULLS LAST
 |  row-size=198B cardinality=29.80K
 |
 13:AGGREGATE [FINALIZE]
@@ -5630,7 +5630,7 @@ PLAN-ROOT SINK
 |  row-size=214B cardinality=29.80K
 |
 06:SORT
-|  order by: i_class ASC NULLS FIRST
+|  order by: i_class ASC NULLS LAST
 |  row-size=198B cardinality=29.80K
 |
 13:AGGREGATE [FINALIZE]
@@ -6149,7 +6149,7 @@ PLAN-ROOT SINK
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  31:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  |  row-size=105B cardinality=685.36K
 |  |
 |  30:AGGREGATE [FINALIZE]
@@ -6204,7 +6204,7 @@ PLAN-ROOT SINK
 |  |  row-size=129B cardinality=685.36K
 |  |
 |  10:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC NULLS FIRST
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC NULLS LAST
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  09:ANALYTIC
@@ -6215,7 +6215,7 @@ PLAN-ROOT SINK
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  08:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  |  row-size=105B cardinality=685.36K
 |  |
 |  07:AGGREGATE [FINALIZE]
@@ -6264,7 +6264,7 @@ PLAN-ROOT SINK
 |  row-size=113B cardinality=685.36K
 |
 21:SORT
-|  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  row-size=105B cardinality=685.36K
 |
 20:AGGREGATE [FINALIZE]
@@ -6331,7 +6331,7 @@ PLAN-ROOT SINK
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  31:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  |  row-size=105B cardinality=685.36K
 |  |
 |  51:AGGREGATE [FINALIZE]
@@ -6403,7 +6403,7 @@ PLAN-ROOT SINK
 |  |  row-size=129B cardinality=685.36K
 |  |
 |  10:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC NULLS FIRST
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC NULLS LAST
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  09:ANALYTIC
@@ -6414,7 +6414,7 @@ PLAN-ROOT SINK
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  08:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  |  row-size=105B cardinality=685.36K
 |  |
 |  45:AGGREGATE [FINALIZE]
@@ -6476,7 +6476,7 @@ PLAN-ROOT SINK
 |  row-size=113B cardinality=685.36K
 |
 21:SORT
-|  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  row-size=105B cardinality=685.36K
 |
 40:AGGREGATE [FINALIZE]
@@ -6560,7 +6560,7 @@ PLAN-ROOT SINK
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  31:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  |  row-size=105B cardinality=685.36K
 |  |
 |  51:AGGREGATE [FINALIZE]
@@ -6648,7 +6648,7 @@ PLAN-ROOT SINK
 |  |  row-size=129B cardinality=685.36K
 |  |
 |  10:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC NULLS FIRST
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC NULLS LAST
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  09:ANALYTIC
@@ -6659,7 +6659,7 @@ PLAN-ROOT SINK
 |  |  row-size=113B cardinality=685.36K
 |  |
 |  08:SORT
-|  |  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  |  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  |  row-size=105B cardinality=685.36K
 |  |
 |  45:AGGREGATE [FINALIZE]
@@ -6733,7 +6733,7 @@ PLAN-ROOT SINK
 |  row-size=113B cardinality=685.36K
 |
 21:SORT
-|  order by: i_category ASC NULLS FIRST, i_brand ASC NULLS FIRST, s_store_name ASC NULLS FIRST, s_company_name ASC NULLS FIRST, d_year ASC, d_moy ASC
+|  order by: i_category ASC NULLS LAST, i_brand ASC NULLS LAST, s_store_name ASC NULLS LAST, s_company_name ASC NULLS LAST, d_year ASC, d_moy ASC
 |  row-size=105B cardinality=685.36K
 |
 40:AGGREGATE [FINALIZE]
@@ -10940,18 +10940,18 @@ PLAN-ROOT SINK
 |
 11:SELECT
 |  predicates: rank() <= 100
-|  row-size=100B cardinality=1.51M
+|  row-size=100B cardinality=100
 |
 10:ANALYTIC
 |  functions: rank()
 |  partition by: i_category
 |  order by: aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 6 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 7 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 8 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 9 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 10 THEN su [...]
 |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  row-size=100B cardinality=15.09M
+|  row-size=100B cardinality=100
 |
-09:SORT
-|  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END ASC NULLS FIRST, aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_qua [...]
-|  row-size=92B cardinality=15.09M
+09:TOP-N [LIMIT=100]
+|  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END ASC NULLS LAST, aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quan [...]
+|  row-size=92B cardinality=100
 |
 08:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 6 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 7 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 8 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 9 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 10 THEN sum( [...]
@@ -11021,11 +11021,11 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> ss_item_sk, RF002 -> ss_store_sk, RF004 -> ss_sold_date_sk
    row-size=24B cardinality=2.88M
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=505.51MB Threads=11
-Per-Host Resource Estimates: Memory=53.08GB
+Max Per-Host Resource Reservation: Memory=493.51MB Threads=11
+Per-Host Resource Estimates: Memory=53.04GB
 PLAN-ROOT SINK
 |
-19:MERGING-EXCHANGE [UNPARTITIONED]
+20:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: i_category ASC, i_class ASC, i_brand ASC, i_product_name ASC, d_year ASC, d_qoy ASC, d_moy ASC, s_store_id ASC, sumsales ASC, rk ASC
 |  limit: 100
 |
@@ -11035,21 +11035,25 @@ PLAN-ROOT SINK
 |
 11:SELECT
 |  predicates: rank() <= 100
-|  row-size=100B cardinality=1.51M
+|  row-size=100B cardinality=100
 |
 10:ANALYTIC
 |  functions: rank()
 |  partition by: i_category
 |  order by: aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 6 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 7 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 8 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 9 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 10 THEN su [...]
 |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  row-size=100B cardinality=15.09M
+|  row-size=100B cardinality=100
 |
-09:SORT
-|  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END ASC NULLS FIRST, aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_qua [...]
-|  row-size=92B cardinality=15.09M
+19:TOP-N [LIMIT=100]
+|  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END ASC NULLS LAST, aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quan [...]
+|  row-size=92B cardinality=100
 |
 18:EXCHANGE [HASH(CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END)]
 |
+09:TOP-N [LIMIT=100]
+|  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END ASC NULLS LAST, aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quan [...]
+|  row-size=92B cardinality=100
+|
 08:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 6 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 7 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 8 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 9 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 10 THEN sum( [...]
 |  group by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END, CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_class WHEN 5 THEN i_class WHEN 6 THEN i_class WHEN 7 THEN i_class WHEN 8 THEN i_class WHEN 9 THEN i_class WHEN 10 THEN i_class WHEN 11 THEN NULL WHEN 12 THEN NULL END, CASE valid_tid [...]
@@ -12267,7 +12271,7 @@ PLAN-ROOT SINK
 |  row-size=54B cardinality=3
 |
 17:SORT
-|  order by: aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 0 WHEN 13 THEN 1 END) + aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) ASC NULLS FIRST, CASE WHEN aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) = 0 THEN CASE valid_tid(11,12,13) WHEN 11 THEN s_state WHEN 12 THEN s_state WHEN 13 THEN NULL EN [...]
+|  order by: aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 0 WHEN 13 THEN 1 END) + aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) ASC NULLS LAST, CASE WHEN aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) = 0 THEN CASE valid_tid(11,12,13) WHEN 11 THEN s_state WHEN 12 THEN s_state WHEN 13 THEN NULL END [...]
 |  row-size=46B cardinality=3
 |
 16:AGGREGATE [FINALIZE]
@@ -12304,7 +12308,7 @@ PLAN-ROOT SINK
 |  |  row-size=38B cardinality=1
 |  |
 |  09:SORT
-|  |  order by: s_state ASC NULLS FIRST, sum(ss_net_profit) DESC
+|  |  order by: s_state ASC NULLS LAST, sum(ss_net_profit) DESC
 |  |  row-size=30B cardinality=1
 |  |
 |  08:AGGREGATE [FINALIZE]
@@ -12381,7 +12385,7 @@ PLAN-ROOT SINK
 |  row-size=54B cardinality=3
 |
 17:SORT
-|  order by: aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 0 WHEN 13 THEN 1 END) + aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) ASC NULLS FIRST, CASE WHEN aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) = 0 THEN CASE valid_tid(11,12,13) WHEN 11 THEN s_state WHEN 12 THEN s_state WHEN 13 THEN NULL EN [...]
+|  order by: aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 0 WHEN 13 THEN 1 END) + aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) ASC NULLS LAST, CASE WHEN aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) = 0 THEN CASE valid_tid(11,12,13) WHEN 11 THEN s_state WHEN 12 THEN s_state WHEN 13 THEN NULL END [...]
 |  row-size=46B cardinality=3
 |
 29:EXCHANGE [HASH(aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 0 WHEN 13 THEN 1 END) + aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END),CASE WHEN aggif(valid_tid(11,12,13) IN (11, 12, 13), CASE valid_tid(11,12,13) WHEN 11 THEN 0 WHEN 12 THEN 1 WHEN 13 THEN 1 END) = 0 THEN CASE valid_tid(11,12,13) WHEN 11 THEN s_state WHEN 12 THEN s_state WHEN 13 THEN NULL END END)]
@@ -12436,7 +12440,7 @@ PLAN-ROOT SINK
 |  |  row-size=38B cardinality=1
 |  |
 |  09:SORT
-|  |  order by: s_state ASC NULLS FIRST, sum(ss_net_profit) DESC
+|  |  order by: s_state ASC NULLS LAST, sum(ss_net_profit) DESC
 |  |  row-size=30B cardinality=1
 |  |
 |  25:AGGREGATE [FINALIZE]
@@ -13115,7 +13119,7 @@ PLAN-ROOT SINK
 |  row-size=54B cardinality=1.00K
 |
 07:SORT
-|  order by: aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 0 WHEN 5 THEN 1 END) + aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) ASC NULLS FIRST, CASE WHEN aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) = 0 THEN CASE valid_tid(3,4,5) WHEN 3 THEN i_category WHEN 4 THEN i_category WHEN 5 THEN NULL END END ASC NULLS FIRST, aggif(valid_t [...]
+|  order by: aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 0 WHEN 5 THEN 1 END) + aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) ASC NULLS LAST, CASE WHEN aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) = 0 THEN CASE valid_tid(3,4,5) WHEN 3 THEN i_category WHEN 4 THEN i_category WHEN 5 THEN NULL END END ASC NULLS LAST, aggif(valid_tid [...]
 |  row-size=46B cardinality=1.00K
 |
 06:AGGREGATE [FINALIZE]
@@ -13179,7 +13183,7 @@ PLAN-ROOT SINK
 |  row-size=54B cardinality=1.00K
 |
 07:SORT
-|  order by: aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 0 WHEN 5 THEN 1 END) + aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) ASC NULLS FIRST, CASE WHEN aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) = 0 THEN CASE valid_tid(3,4,5) WHEN 3 THEN i_category WHEN 4 THEN i_category WHEN 5 THEN NULL END END ASC NULLS FIRST, aggif(valid_t [...]
+|  order by: aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 0 WHEN 5 THEN 1 END) + aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) ASC NULLS LAST, CASE WHEN aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) = 0 THEN CASE valid_tid(3,4,5) WHEN 3 THEN i_category WHEN 4 THEN i_category WHEN 5 THEN NULL END END ASC NULLS LAST, aggif(valid_tid [...]
 |  row-size=46B cardinality=1.00K
 |
 14:EXCHANGE [HASH(aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 0 WHEN 5 THEN 1 END) + aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END),CASE WHEN aggif(valid_tid(3,4,5) IN (3, 4, 5), CASE valid_tid(3,4,5) WHEN 3 THEN 0 WHEN 4 THEN 1 WHEN 5 THEN 1 END) = 0 THEN CASE valid_tid(3,4,5) WHEN 3 THEN i_category WHEN 4 THEN i_category WHEN 5 THEN NULL END END)]
@@ -13475,7 +13479,7 @@ PLAN-ROOT SINK
 |  row-size=70B cardinality=1.00K
 |
 09:SORT
-|  order by: aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 0 WHEN 6 THEN 1 END) + aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) ASC NULLS FIRST, CASE WHEN aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) = 0 THEN CASE valid_tid(4,5,6) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN NULL END END ASC NULLS FIRST, aggif(valid_t [...]
+|  order by: aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 0 WHEN 6 THEN 1 END) + aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) ASC NULLS LAST, CASE WHEN aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) = 0 THEN CASE valid_tid(4,5,6) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN NULL END END ASC NULLS LAST, aggif(valid_tid [...]
 |  row-size=62B cardinality=1.00K
 |
 08:AGGREGATE [FINALIZE]
@@ -13549,7 +13553,7 @@ PLAN-ROOT SINK
 |  row-size=70B cardinality=1.00K
 |
 09:SORT
-|  order by: aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 0 WHEN 6 THEN 1 END) + aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) ASC NULLS FIRST, CASE WHEN aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) = 0 THEN CASE valid_tid(4,5,6) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN NULL END END ASC NULLS FIRST, aggif(valid_t [...]
+|  order by: aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 0 WHEN 6 THEN 1 END) + aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) ASC NULLS LAST, CASE WHEN aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) = 0 THEN CASE valid_tid(4,5,6) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN NULL END END ASC NULLS LAST, aggif(valid_tid [...]
 |  row-size=62B cardinality=1.00K
 |
 17:EXCHANGE [HASH(aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 0 WHEN 6 THEN 1 END) + aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END),CASE WHEN aggif(valid_tid(4,5,6) IN (4, 5, 6), CASE valid_tid(4,5,6) WHEN 4 THEN 0 WHEN 5 THEN 1 WHEN 6 THEN 1 END) = 0 THEN CASE valid_tid(4,5,6) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN NULL END END)]
@@ -13624,8 +13628,8 @@ PLAN-ROOT SINK
    HDFS partitions=1824/1824 files=1824 size=346.60MB
    runtime filters: RF000 -> ss_store_sk, RF002 -> ss_item_sk, RF004 -> ss_sold_date_sk
    row-size=24B cardinality=2.88M
-=======
----- QUERY: TPCDS-Q38
+====
+# TPCDS-Q38
 SELECT count(*)
 FROM
   (SELECT DISTINCT c_last_name,
@@ -13899,7 +13903,7 @@ PLAN-ROOT SINK
    runtime filters: RF012 -> store_sales.ss_customer_sk, RF014 -> store_sales.ss_sold_date_sk
    row-size=8B cardinality=2.88M
 ====
----- QUERY: TPCDS-Q87
+# TPCDS-Q87
 SELECT count(*)
 FROM ((SELECT DISTINCT c_last_name,
                          c_first_name,
diff --git a/testdata/workloads/tpch/queries/limit-pushdown-analytic.test b/testdata/workloads/tpch/queries/limit-pushdown-analytic.test
new file mode 100644
index 0000000..33e2b50
--- /dev/null
+++ b/testdata/workloads/tpch/queries/limit-pushdown-analytic.test
@@ -0,0 +1,74 @@
+====
+---- QUERY
+# IMPALA-9983
+# Base case. Limit pushdown into analytic sort should be applied
+select * from (
+  select l_partkey, l_quantity, l_orderkey,
+    rank() over (partition by l_partkey order by l_orderkey desc) rk
+  from lineitem) dt
+where rk <= 10
+order by l_partkey, l_quantity, l_orderkey, rk
+limit 10
+---- RESULTS
+1,16.00,4464900,9
+1,20.00,4450401,10
+1,21.00,5618244,3
+1,30.00,4912929,7
+1,32.00,5926723,1
+1,33.00,5362629,4
+1,37.00,4810753,8
+1,38.00,5862498,2
+1,40.00,5352710,5
+1,42.00,5120486,6
+---- TYPES
+BIGINT, DECIMAL, BIGINT, BIGINT
+====
+---- QUERY
+# Multi column partition-by which is prefix of sort exprs.
+# Limit pushdown into analytic sort should be applied
+select * from (
+  select l_partkey, l_quantity, l_orderkey,
+    rank() over (partition by l_partkey, l_quantity
+                 order by l_orderkey desc) rk
+  from lineitem) dt
+where rk <= 10
+order by l_partkey, l_quantity, l_orderkey, rk
+limit 10
+---- RESULTS
+1,1.00,599361,1
+1,8.00,3438019,1
+1,11.00,4422049,1
+1,14.00,2535332,1
+1,15.00,545153,1
+1,16.00,1139363,3
+1,16.00,1695463,2
+1,16.00,4464900,1
+1,19.00,1876199,1
+1,20.00,4450401,1
+---- TYPES
+BIGINT, DECIMAL, BIGINT, BIGINT
+====
+---- QUERY
+# No predicate after the analytic function.
+# Limit pushdown should be applied
+select * from (
+  select l_partkey, l_quantity, l_orderkey,
+    rank() over (partition by l_partkey, l_quantity
+                 order by l_orderkey desc) rk
+  from lineitem) dt
+order by l_partkey, l_quantity, l_orderkey, rk
+limit 10
+---- RESULTS
+1,1.00,599361,1
+1,8.00,3438019,1
+1,11.00,4422049,1
+1,14.00,2535332,1
+1,15.00,545153,1
+1,16.00,1139363,3
+1,16.00,1695463,2
+1,16.00,4464900,1
+1,19.00,1876199,1
+1,20.00,4450401,1
+---- TYPES
+BIGINT, DECIMAL, BIGINT, BIGINT
+====
diff --git a/tests/query_test/test_limit_pushdown_analytic.py b/tests/query_test/test_limit_pushdown_analytic.py
new file mode 100644
index 0000000..dca9c6b
--- /dev/null
+++ b/tests/query_test/test_limit_pushdown_analytic.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test the limit pushdown to analytic sort in the presence
+# of ranking functions
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+class TestLimitPushdownAnalytic(ImpalaTestSuite):
+
+
+  @classmethod
+  def get_workload(cls):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestLimitPushdownAnalytic, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ['parquet'])
+
+  def test_limit_pushdown_analytic(self, vector):
+    self.run_test_case('limit-pushdown-analytic', vector)