You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/11/07 18:54:38 UTC

[2/3] impala git commit: IMPALA-5004: Switch to sorting node for large TopN queries

IMPALA-5004: Switch to sorting node for large TopN queries

Adds a new query option 'topn_bytes_limit' that places a limit on the
number of estimated bytes that a TopN operator can process. If the
Impala planner estimates that a TopN operator will process more bytes
than this limit, it will replace the TopN operator with a sort operator.

Since the TopN operator cannot spill to disk, it has to buffer everything
in memory. This can cause frequent OOM issues when running with a large
limit + offset. Switching to a sort operator allows Impala to spill to
disk. We prefer to use the TopN operator when possible as it has better
performance than the sort operator for 'order by limit [offset]' queries.

The default limit is set to 512MB and is based on micro-benchmarking the
topn vs. sort operator for various limits (see the JIRA for full details).
The default is set to an intentionally high value in order to avoid
performance regressions.

Testing:

* Added a new planner test to fuctional-planner/ to validate that
'topn_bytes_limit' properly switches between topn and sort operators.

Change-Id: I34c9db33c9302b55e9978f53f9c7061f2806c8a9
Reviewed-on: http://gerrit.cloudera.org:8080/11698
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/98d92324
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/98d92324
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/98d92324

Branch: refs/heads/master
Commit: 98d923243f8bc95d66e497f4d8a15db57af32663
Parents: 5baa289
Author: stakiar <ta...@gmail.com>
Authored: Thu Oct 4 16:50:53 2018 -0500
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed Nov 7 01:17:34 2018 +0000

----------------------------------------------------------------------
 be/src/service/query-options-test.cc            |  1 +
 be/src/service/query-options.cc                 |  6 ++
 be/src/service/query-options.h                  |  3 +-
 common/thrift/ImpalaInternalService.thrift      |  4 ++
 common/thrift/ImpalaService.thrift              |  6 ++
 .../org/apache/impala/analysis/SortInfo.java    | 14 ++++
 .../apache/impala/planner/AggregationNode.java  |  2 +-
 .../apache/impala/planner/AnalyticEvalNode.java |  2 +-
 .../impala/planner/DataSourceScanNode.java      |  2 +-
 .../impala/planner/DistributedPlanner.java      |  2 +-
 .../apache/impala/planner/HBaseScanNode.java    |  2 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  2 +-
 .../org/apache/impala/planner/JoinNode.java     |  2 +-
 .../org/apache/impala/planner/KuduScanNode.java |  2 +-
 .../org/apache/impala/planner/PlanNode.java     | 12 ++--
 .../org/apache/impala/planner/SelectNode.java   |  2 +-
 .../impala/planner/SingleNodePlanner.java       | 54 +++++++++++----
 .../org/apache/impala/planner/SortNode.java     |  7 +-
 .../org/apache/impala/planner/SubplanNode.java  |  2 +-
 .../org/apache/impala/planner/UnionNode.java    |  2 +-
 .../org/apache/impala/planner/UnnestNode.java   |  2 +-
 .../org/apache/impala/planner/PlannerTest.java  | 16 ++++-
 .../PlannerTest/topn-bytes-limit-small.test     | 72 ++++++++++++++++++++
 .../queries/PlannerTest/topn-bytes-limit.test   | 23 +++++++
 .../QueryTest/spilling-no-debug-action.test     |  1 +
 25 files changed, 205 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 114d830..059b355 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -144,6 +144,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(compute_stats_min_sample_size), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_mem_estimate_for_admission), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(scan_bytes_limit), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(topn_bytes_limit), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 1424896..bb49762 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -714,6 +714,12 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_cpu_limit_s(cpu_limit_s);
         break;
       }
+      case TImpalaQueryOptions::TOPN_BYTES_LIMIT: {
+        int64_t topn_bytes_limit;
+        RETURN_IF_ERROR(ParseMemValue(value, "topn bytes limit", &topn_bytes_limit));
+        query_options->__set_topn_bytes_limit(topn_bytes_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 46cdf05..95263f7 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::CPU_LIMIT_S + 1);\
+      TImpalaQueryOptions::TOPN_BYTES_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -143,6 +143,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\
+  QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index e517f3e..821cdd8 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -304,6 +304,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift.
   72: optional i64 cpu_limit_s = 0;
+
+  // See comment in ImpalaService.thrift
+  // The default value is set to 512MB based on empirical data
+  73: optional i64 topn_bytes_limit = 536870912;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 05a1431..758617d 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -339,6 +339,12 @@ enum TImpalaQueryOptions {
   // Note that until IMPALA-7318 is fixed, CPU usage can be very stale and this may not
   // terminate queries soon enough.
   CPU_LIMIT_S,
+
+  // The max number of estimated bytes a TopN operator is allowed to materialize, if the
+  // planner thinks a TopN operator will exceed this limit, it falls back to a TotalSort
+  // operator which is capable of spilling to disk (unlike the TopN operator which keeps
+  // everything in memory). 0 or -1 means this has no effect.
+  TOPN_BYTES_LIMIT,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index fba7286..8472725 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -16,11 +16,13 @@
 // under the License.
 
 package org.apache.impala.analysis;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.impala.common.TreeNode;
+import org.apache.impala.planner.PlanNode;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -233,6 +235,18 @@ public class SortInfo {
   }
 
   /**
+   * Estimates the size of the data materialized in memory by the TopN operator. The
+   * method uses the formula <code>estimatedSize = estimated # of rows in memory *
+   * average tuple serialized size</code>. 'cardinality' is the cardinality of the TopN
+   * operator and 'offset' is the value in the 'OFFSET [x]' clause.
+   */
+  public long estimateTopNMaterializedSize(long cardinality, long offset) {
+    getSortTupleDescriptor().computeMemLayout();
+    return (long) Math.ceil(getSortTupleDescriptor().getAvgSerializedSize()
+        * (PlanNode.checkedAdd(cardinality, offset)));
+  }
+
+  /**
    * Returns the subset of 'sortExprs_' that should be materialized. A sort expr is
    * is materialized if it:
    * - contains a non-deterministic expr

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index ab234e4..72694ca 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -249,7 +249,7 @@ public class AggregationNode extends PlanNode {
         cardinality_ = Math.min(getChild(0).getCardinality(), cardinality_);
       }
     }
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 5ca666c..86e5166 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -138,7 +138,7 @@ public class AnalyticEvalNode extends PlanNode {
   protected void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     cardinality_ = getChild(0).cardinality_;
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 1ddb394..d4740e8 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -268,7 +268,7 @@ public class DataSourceScanNode extends ScanNode {
     cardinality_ = numRowsEstimate_;
     cardinality_ *= computeSelectivity();
     cardinality_ = Math.max(1, cardinality_);
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
 
     if (LOG.isTraceEnabled()) {
       LOG.trace("computeStats DataSourceScan: cardinality=" + Long.toString(cardinality_));

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index ac3115b..05563b0 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -1054,7 +1054,7 @@ public class DistributedPlanner {
     Preconditions.checkState(node == childSortNode);
     if (hasLimit) {
       childSortNode.unsetLimit();
-      childSortNode.setLimit(limit + offset);
+      childSortNode.setLimit(PlanNode.checkedAdd(limit, offset));
     }
     childSortNode.setOffset(0);
     childSortNode.computeStats(ctx_.getRootAnalyzer());

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 13ecb6a..1b60ad4 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -226,7 +226,7 @@ public class HBaseScanNode extends ScanNode {
 
     cardinality_ *= computeSelectivity();
     cardinality_ = Math.max(1, cardinality_);
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
     if (LOG.isTraceEnabled()) {
       LOG.trace("computeStats HbaseScan: cardinality=" + Long.toString(cardinality_));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0246d8c..c1ff092 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1054,7 +1054,7 @@ public class HdfsScanNode extends ScanNode {
       // IMPALA-2165: Avoid setting the cardinality to 0 after rounding.
       cardinality_ = Math.max(cardinality_, 1);
     }
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
     if (LOG.isTraceEnabled()) {
       LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index e8adcfa..cc50318 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -617,7 +617,7 @@ public abstract class JoinNode extends PlanNode {
         break;
       }
     }
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
     Preconditions.checkState(hasValidStats());
     if (LOG.isTraceEnabled()) {
       LOG.trace("stats Join: cardinality=" + Long.toString(cardinality_));

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index adeaa72..fc1f371 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -265,7 +265,7 @@ public class KuduScanNode extends ScanNode {
     inputCardinality_ = cardinality_ = kuduTable_.getNumRows();
     cardinality_ *= computeSelectivity();
     cardinality_ = Math.min(Math.max(1, cardinality_), kuduTable_.getNumRows());
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
     if (LOG.isTraceEnabled()) {
       LOG.trace("computeStats KuduScan: cardinality=" + Long.toString(cardinality_));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 744aa09..7751eeb 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -521,17 +521,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     if (!children_.isEmpty()) numNodes_ = getChild(0).numNodes_;
   }
 
-  protected long capAtLimit(long cardinality) {
+  protected long capCardinalityAtLimit(long cardinality) {
     if (hasLimit()) {
-      if (cardinality == -1) {
-        return limit_;
-      } else {
-        return Math.min(cardinality, limit_);
-      }
+      return capCardinalityAtLimit(cardinality, limit_);
     }
     return cardinality;
   }
 
+  static long capCardinalityAtLimit(long cardinality, long limit) {
+    return cardinality == -1 ? limit : Math.min(cardinality, limit);
+  }
+
   /**
    * Call computeMemLayout() for all materialized tuples.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/SelectNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index 3ffc975..7b637e6 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -75,7 +75,7 @@ public class SelectNode extends PlanNode {
           Math.round(((double) getChild(0).cardinality_) * computeSelectivity());
       Preconditions.checkState(cardinality_ >= 0);
     }
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
     if (LOG.isTraceEnabled()) {
       LOG.trace("stats Select: cardinality=" + Long.toString(cardinality_));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 313597e..740cbfa 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -48,6 +48,7 @@ import org.apache.impala.analysis.SingularRowSrcTableRef;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
@@ -294,20 +295,8 @@ public class SingleNodePlanner {
     }
 
     if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) {
-      long limit = stmt.getLimit();
-      // TODO: External sort could be used for very large limits
-      // not just unlimited order-by
-      boolean useTopN = stmt.hasLimit() && !disableTopN;
-      if (useTopN) {
-        root = SortNode.createTopNSortNode(
-            ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset());
-      } else {
-        root = SortNode.createTotalSortNode(
-            ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset());
-      }
-      Preconditions.checkState(root.hasValidStats());
-      root.setLimit(limit);
-      root.init(analyzer);
+      root = createSortNode(analyzer, root, stmt.getSortInfo(), stmt.getLimit(),
+          stmt.getOffset(), stmt.hasLimit(), disableTopN);
     } else {
       root.setLimit(stmt.getLimit());
       root.computeStats(analyzer);
@@ -317,6 +306,43 @@ public class SingleNodePlanner {
   }
 
   /**
+   * Creates and initializes either a SortNode or a TopNNode depending on various
+   * heuristics and configuration parameters.
+   */
+  private SortNode createSortNode(Analyzer analyzer, PlanNode root, SortInfo sortInfo,
+      long limit, long offset, boolean hasLimit, boolean disableTopN)
+      throws ImpalaException {
+    SortNode sortNode;
+    long topNBytesLimit = ctx_.getQueryOptions().topn_bytes_limit;
+
+    if (hasLimit && !disableTopN) {
+      if (topNBytesLimit <= 0) {
+        sortNode =
+            SortNode.createTopNSortNode(ctx_.getNextNodeId(), root, sortInfo, offset);
+      } else {
+        long topNCardinality = PlanNode.capCardinalityAtLimit(root.cardinality_, limit);
+        long estimatedTopNMaterializedSize =
+            sortInfo.estimateTopNMaterializedSize(topNCardinality, offset);
+
+        if (estimatedTopNMaterializedSize < topNBytesLimit) {
+          sortNode =
+              SortNode.createTopNSortNode(ctx_.getNextNodeId(), root, sortInfo, offset);
+        } else {
+          sortNode =
+              SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, offset);
+        }
+      }
+    } else {
+      sortNode =
+          SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, offset);
+    }
+    Preconditions.checkState(sortNode.hasValidStats());
+    sortNode.setLimit(limit);
+    sortNode.init(analyzer);
+    return sortNode;
+  }
+
+  /**
    * If there are unassigned conjuncts that are bound by tupleIds or if there are slot
    * equivalences for tupleIds that have not yet been enforced, returns a SelectNode on
    * top of root that evaluates those conjuncts; otherwise returns root unchanged.

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index ccef721..e23b933 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -164,7 +164,7 @@ public class SortNode extends PlanNode {
   @Override
   protected void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
-    cardinality_ = capAtLimit(getChild(0).cardinality_);
+    cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_);
     if (LOG.isTraceEnabled()) {
       LOG.trace("stats Sort: cardinality=" + Long.toString(cardinality_));
     }
@@ -257,9 +257,8 @@ public class SortNode extends PlanNode {
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkState(hasValidStats());
     if (type_ == TSortType.TOPN) {
-      long perInstanceMemEstimate =
-              (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
-      nodeResourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
+      nodeResourceProfile_ = ResourceProfile.noReservation(
+          getSortInfo().estimateTopNMaterializedSize(cardinality_, offset_));
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index a22c397..5b1e323 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -91,7 +91,7 @@ public class SubplanNode extends PlanNode {
     } else {
       cardinality_ = -1;
     }
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/UnionNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index a8cda0d..00a6d20 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -124,7 +124,7 @@ public class UnionNode extends PlanNode {
     // are inline views (e.g. select 1 FROM (VALUES(1 x, 1 y)) a FULL OUTER JOIN
     // (VALUES(1 x, 1 y)) b ON (a.x = b.y)). We need to set the correct value.
     if (numNodes_ == -1) numNodes_ = 1;
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
     if (LOG.isTraceEnabled()) {
       LOG.trace("stats Union: cardinality=" + Long.toString(cardinality_));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
index 7e0a87e..857a949 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
@@ -70,7 +70,7 @@ public class UnnestNode extends PlanNode {
     // The containing SubplanNode has not yet been initialized, so get the number
     // of nodes from the SubplanNode's input.
     numNodes_ = containingSubplanNode_.getChild(0).getNumNodes();
-    cardinality_ = capAtLimit(cardinality_);
+    cardinality_ = capCardinalityAtLimit(cardinality_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index e8e4fb8..2ec4b15 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -193,7 +193,21 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testTopN() {
-    runPlannerTestFile("topn");
+    TQueryOptions options = new TQueryOptions();
+    options.setTopn_bytes_limit(0);
+    runPlannerTestFile("topn", options);
+  }
+
+  @Test
+  public void testTopNBytesLimit() {
+    runPlannerTestFile("topn-bytes-limit");
+  }
+
+  @Test
+  public void testTopNBytesLimitSmall() {
+    TQueryOptions options = new TQueryOptions();
+    options.setTopn_bytes_limit(6);
+    runPlannerTestFile("topn-bytes-limit-small", options);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test
new file mode 100644
index 0000000..db3e9a5
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test
@@ -0,0 +1,72 @@
+# topn_bytes_limit is set to 6 so a limit of 1 will return a single int
+# a single int is 4 bytes, which is under the limit of 6 so a TOP-N should be triggered
+select int_col from functional.alltypes order by 1 limit 1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:TOP-N [LIMIT=1]
+|  order by: int_col ASC
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: int_col ASC
+|  limit: 1
+|
+01:TOP-N [LIMIT=1]
+|  order by: int_col ASC
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# returns 2 ints, with a total size of 8 bytes, which exceeds the limit of 6 and thus triggers a SORT
+select int_col from functional.alltypes order by 1 limit 2
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SORT [LIMIT=2]
+|  order by: int_col ASC
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: int_col ASC
+|  limit: 2
+|
+01:SORT [LIMIT=2]
+|  order by: int_col ASC
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# test that offset is taken into account; the query only returns a single row but needs to sort two rows
+# sorting two ints requires 8 bytes of memory, which exceeds the threshold of 6
+select int_col from functional.alltypes order by 1 limit 1 offset 1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SORT [LIMIT=1 OFFSET=1]
+|  order by: int_col ASC
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  offset: 1
+|  order by: int_col ASC
+|  limit: 1
+|
+01:SORT [LIMIT=2]
+|  order by: int_col ASC
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test
new file mode 100644
index 0000000..9ad000e
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test
@@ -0,0 +1,23 @@
+# check that topn is triggered for low limits with the default value of topn_bytes_limit
+select id from functional.alltypestiny order by id limit 7
+---- PLAN
+PLAN-ROOT SINK
+|
+01:TOP-N [LIMIT=7]
+|  order by: id ASC
+|
+00:SCAN HDFS [functional.alltypestiny]
+   partitions=4/4 files=4 size=460B
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: id ASC
+|  limit: 7
+|
+01:TOP-N [LIMIT=7]
+|  order by: id ASC
+|
+00:SCAN HDFS [functional.alltypestiny]
+   partitions=4/4 files=4 size=460B
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/98d92324/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
index 7fe1c96..c82ea17 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
@@ -79,6 +79,7 @@ Memory limit exceeded
 ---- QUERY
 # Top-N query with large limit that will OOM because spilling is not implemented:
 # IMPALA-3471. It does not need any help from DEBUG_ACTION.
+set topn_bytes_limit=-1;
 set mem_limit=100m;
 select *
 from lineitem