You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/28 23:41:55 UTC

[04/15] impala git commit: IMPALA-6679, IMPALA-6678: reduce scan reservation

IMPALA-6679,IMPALA-6678: reduce scan reservation

This has two related changes.

IMPALA-6679: defer scanner reservation increases
------------------------------------------------
When starting each scan range, check to see how big the initial scan
range is (the full thing for row-based formats, the footer for
Parquet) and determine whether more reservation would be useful.

For Parquet, base the ideal reservation on the actual column layout
of each file. This avoids reserving memory that we won't use for
the actual files that we're scanning. This also avoid the need to
estimate ideal reservation in the planner.

We also release scanner thread reservations above the minimum as
soon as threads complete, so that resources can be released slightly
earlier.

IMPALA-6678: estimate Parquet column size for reservation
---------------------------------------------------------
This change also reduces reservation computed by the planner in certain
cases by estimating the on-disk size of column data based on stats. It
also reduces the default per-column reservation to 4MB since it appears
that < 8MB columns are generally common in practice and the method for
estimating column size is biased towards over-estimating. There are two
main cases to consider for the performance implications:
* Memory is available to improve query perf - if we underestimate, we
  can increase the reservation so we can do "efficient" 8MB I/Os for
  large columns.
* The ideal reservation is not available - query performance is affected
  because we can't overlap I/O and compute as much and may do smaller
  (probably 4MB I/Os). However, we should avoid pathological behaviour
  like tiny I/Os.

When stats are not available, we just default to reserving 4MB per
column, which typically is more memory than required. When stats are
available, the memory required can be reduced below when some heuristic
tell us with high confidence that the column data for most or all files
is smaller than 4MB.

The stats-based heuristic could reduce scan performance if both the
conservative heuristics significantly underestimate the column size
and memory is constrained such that we can't increase the scan
reservation at runtime (in which case the memory might be used by
a different operator or scanner thread).

Observability:
Added counters to track when threads were not spawned due to reservation
and to track when reservation increases are requested and denied. These
allow determining if performance may have been affected by memory
availability.

Testing:
Updated test_mem_usage_scaling.py memory requirements and added steps
to regenerate the requirements. Loops test for a while to flush out
flakiness.

Added targeted planner and query tests for reservation calculations and
increases.

Change-Id: Ifc80e05118a9eef72cac8e2308418122e3ee0842
Reviewed-on: http://gerrit.cloudera.org:8080/9757
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/418c7057
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/418c7057
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/418c7057

Branch: refs/heads/master
Commit: 418c705787f060afb3e9e5fbeadb891b2484b6c5
Parents: 93d714c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Mar 16 13:27:46 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Sat Apr 28 23:41:39 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-orc-scanner.cc                 |   1 +
 be/src/exec/hdfs-parquet-scanner-test.cc        |  82 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  51 +-
 be/src/exec/hdfs-parquet-scanner.h              |  19 +-
 be/src/exec/hdfs-scan-node-base.cc              |  63 +-
 be/src/exec/hdfs-scan-node-base.h               |  30 +-
 be/src/exec/hdfs-scan-node-mt.cc                |  13 +-
 be/src/exec/hdfs-scan-node.cc                   | 104 +-
 be/src/exec/hdfs-scan-node.h                    |  56 +-
 be/src/exec/scanner-context.cc                  |   9 +-
 be/src/exec/scanner-context.h                   |  23 +-
 be/src/runtime/io/disk-io-mgr.cc                |  14 +
 be/src/runtime/io/disk-io-mgr.h                 |   4 +
 be/src/util/runtime-profile.cc                  |   9 +
 be/src/util/runtime-profile.h                   |   4 +
 common/thrift/PlanNodes.thrift                  |   3 -
 .../org/apache/impala/catalog/ColumnStats.java  |   1 +
 .../apache/impala/catalog/HdfsPartition.java    |   9 +
 .../org/apache/impala/planner/HdfsScanNode.java | 337 +++++--
 .../org/apache/impala/testutil/TestUtils.java   |  19 +
 testdata/bin/compute-table-stats.sh             |   2 +-
 .../queries/PlannerTest/constant-folding.test   |  32 +-
 .../queries/PlannerTest/disable-codegen.test    |   4 +-
 .../PlannerTest/fk-pk-join-detection.test       |  88 +-
 .../queries/PlannerTest/max-row-size.test       |  90 +-
 .../PlannerTest/min-max-runtime-filters.test    |   4 +-
 .../queries/PlannerTest/mt-dop-validation.test  |  48 +-
 .../queries/PlannerTest/parquet-filtering.test  |  52 +-
 .../queries/PlannerTest/partition-pruning.test  |   2 +-
 .../PlannerTest/resource-requirements.test      | 945 +++++++++++--------
 .../PlannerTest/sort-expr-materialization.test  |  18 +-
 .../PlannerTest/spillable-buffer-sizing.test    | 204 ++--
 .../queries/PlannerTest/tablesample.test        |  26 +-
 .../queries/PlannerTest/union.test              |  10 +-
 .../admission-reject-mem-estimate.test          |  69 +-
 .../admission-reject-min-reservation.test       |   6 +-
 .../queries/QueryTest/explain-level2.test       |   4 +-
 .../queries/QueryTest/explain-level3.test       |   4 +-
 .../queries/QueryTest/nested-types-tpch.test    |   4 +-
 .../queries/QueryTest/scanner-reservation.test  |  55 ++
 .../QueryTest/single-node-nlj-exhaustive.test   |   5 +-
 .../queries/QueryTest/spilling-aggs.test        |   8 +-
 .../queries/QueryTest/spilling.test             |   4 +-
 .../custom_cluster/test_admission_controller.py |   2 +-
 tests/query_test/test_mem_usage_scaling.py      |  15 +-
 tests/query_test/test_queries.py                |   3 -
 tests/query_test/test_scanners.py               |  44 +-
 tests/query_test/test_sort.py                   |   7 +-
 tests/stress/extract_min_mem.py                 |  49 +
 49 files changed, 1652 insertions(+), 1003 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-orc-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index f12230e..b755e7a 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -118,6 +118,7 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
     status = ExecEnv::GetInstance()->disk_io_mgr()->StartScanRange(
           scanner_->scan_node_->reader_context(), range, &needs_buffers);
     DCHECK(!status.ok() || !needs_buffers) << "Already provided a buffer";
+    if (status.ok()) status = range->GetNext(&io_buffer);
   }
   if (io_buffer != nullptr) range->ReturnBuffer(move(io_buffer));
   if (!status.ok()) throw ResourceError(status);

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-parquet-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc b/be/src/exec/hdfs-parquet-scanner-test.cc
index cbc6e76..85fa3ef 100644
--- a/be/src/exec/hdfs-parquet-scanner-test.cc
+++ b/be/src/exec/hdfs-parquet-scanner-test.cc
@@ -16,6 +16,8 @@
 // under the License.
 
 #include "exec/hdfs-parquet-scanner.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 
 #include "common/names.h"
@@ -23,14 +25,87 @@
 static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
 static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
 
+DECLARE_int64(min_buffer_size);
+DECLARE_int32(read_size);
+
 namespace impala {
 
 class HdfsParquetScannerTest : public testing::Test {
+ public:
+  virtual void SetUp() {
+    // Override min/max buffer sizes picked up by DiskIoMgr.
+    FLAGS_min_buffer_size = MIN_BUFFER_SIZE;
+    FLAGS_read_size = MAX_BUFFER_SIZE;
+
+    test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+  }
+
+  virtual void TearDown() {
+    test_env_.reset();
+  }
+
  protected:
+  void TestComputeIdealReservation(const vector<int64_t>& col_range_lengths,
+      int64_t expected_ideal_reservation);
   void TestDivideReservation(const vector<int64_t>& col_range_lengths,
       int64_t total_col_reservation, const vector<int64_t>& expected_reservations);
+
+  boost::scoped_ptr<TestEnv> test_env_;
 };
 
+/// Test the ComputeIdealReservation returns 'expected_ideal_reservation' for a list
+/// of columns with 'col_range_lengths'.
+void HdfsParquetScannerTest::TestComputeIdealReservation(
+    const vector<int64_t>& col_range_lengths, int64_t expected_ideal_reservation) {
+  EXPECT_EQ(expected_ideal_reservation,
+      HdfsParquetScanner::ComputeIdealReservation(col_range_lengths));
+}
+
+TEST_F(HdfsParquetScannerTest, ComputeIdealReservation) {
+  // Should round up to nearest power-of-two buffer size if < max scan range buffer.
+  TestComputeIdealReservation({0}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({1}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MIN_BUFFER_SIZE - 1}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MIN_BUFFER_SIZE}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MIN_BUFFER_SIZE + 2}, 2 * MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({4 * MIN_BUFFER_SIZE + 1234}, 8 * MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE - 10}, MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE}, MAX_BUFFER_SIZE);
+
+  // Should round to nearest max I/O buffer size if >= max scan range buffer, up to 3
+  // buffers.
+  TestComputeIdealReservation({MAX_BUFFER_SIZE + 1}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 2 - 1}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 2}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 2 + 1}, 3 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 3 + 1}, 3 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 100 + 27}, 3 * MAX_BUFFER_SIZE);
+
+  // Ideal reservations from multiple ranges are simply added together.
+  TestComputeIdealReservation({1, 2}, 2 * MIN_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE, MAX_BUFFER_SIZE - 1}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE, MIN_BUFFER_SIZE + 1}, MAX_BUFFER_SIZE + 2 * MIN_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE, MAX_BUFFER_SIZE * 128}, 4 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE * 7, MAX_BUFFER_SIZE * 128, MAX_BUFFER_SIZE * 1000},
+      3L * 3L * MAX_BUFFER_SIZE);
+
+  // Test col size that doesn't fit in int32.
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 1024L}, 3L * MAX_BUFFER_SIZE);
+
+  // Test sum of reservations that doesn't fit in int32.
+  vector<int64_t> col_range_lengths;
+  const int64_t LARGE_NUM_RANGES = 10000;
+  for (int i = 0; i < LARGE_NUM_RANGES; ++i) {
+    col_range_lengths.push_back(4 * MAX_BUFFER_SIZE);
+  }
+  TestComputeIdealReservation(col_range_lengths, LARGE_NUM_RANGES * 3L * MAX_BUFFER_SIZE);
+}
+
 /// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for
 /// inputs 'col_range_lengths' and 'total_col_reservation'.
 void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& col_range_lengths,
@@ -93,4 +168,9 @@ TEST_F(HdfsParquetScannerTest, DivideReservation) {
 
 }
 
-IMPALA_TEST_MAIN();
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 8f9d8ca..b40816d 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -53,14 +53,23 @@ const int16_t HdfsParquetScanner::INVALID_POS;
 
 const char* HdfsParquetScanner::LLVM_CLASS_NAME = "class.impala::HdfsParquetScanner";
 
-const string PARQUET_MEM_LIMIT_EXCEEDED =
+static const string PARQUET_MEM_LIMIT_EXCEEDED =
     "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
 
 namespace impala {
 
+static const string IDEAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupIdealReservation";
+static const string ACTUAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupActualReservation";
+
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   DCHECK(!files.empty());
+  // Add Parquet-specific counters.
+  ADD_SUMMARY_STATS_COUNTER(
+      scan_node->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
+  ADD_SUMMARY_STATS_COUNTER(
+      scan_node->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
+
   for (HdfsFileDesc* file : files) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
     if (file->file_length < 12) {
@@ -161,14 +170,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
-  int64_t stream_reservation = stream_->reservation();
   stream_ = nullptr;
   context_->ReleaseCompletedResources(true);
   context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
-  // The scanner-wide stream was used only to read the file footer.  Each column has added
-  // its own stream. We can use the reservation from 'stream_' for the columns now.
-  total_col_reservation_ = stream_reservation;
 
   // Parse the file schema into an internal representation for schema resolution.
   schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
@@ -1469,39 +1474,61 @@ Status HdfsParquetScanner::InitScalarColumns() {
     }
     RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
   }
-  RETURN_IF_ERROR(
-      DivideReservationBetweenColumns(scalar_readers_, total_col_reservation_));
+  RETURN_IF_ERROR(DivideReservationBetweenColumns(scalar_readers_));
   return Status::OK();
 }
 
 Status HdfsParquetScanner::DivideReservationBetweenColumns(
-    const vector<BaseScalarColumnReader*>& column_readers,
-    int64_t reservation_to_distribute) {
+    const vector<BaseScalarColumnReader*>& column_readers) {
   DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
   const int64_t min_buffer_size = io_mgr->min_buffer_size();
   const int64_t max_buffer_size = io_mgr->max_buffer_size();
   // The HdfsScanNode reservation calculation in the planner ensures that we have
   // reservation for at least one buffer per column.
-  if (reservation_to_distribute < min_buffer_size * column_readers.size()) {
+  if (context_->total_reservation() < min_buffer_size * column_readers.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at "
                    "least $1 bytes per column for $2 columns but had $3 bytes",
             filename(), min_buffer_size, column_readers.size(),
-            reservation_to_distribute));
+            context_->total_reservation()));
   }
 
   vector<int64_t> col_range_lengths(column_readers.size());
   for (int i = 0; i < column_readers.size(); ++i) {
     col_range_lengths[i] = column_readers[i]->scan_range()->len();
   }
+
+  // The scanner-wide stream was used only to read the file footer.  Each column has added
+  // its own stream. We can use the total reservation now that 'stream_''s resources have
+  // been released. We may benefit from increasing reservation further, so let's compute
+  // the ideal reservation to scan all the columns.
+  int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths);
+  if (ideal_reservation > context_->total_reservation()) {
+    context_->TryIncreaseReservation(ideal_reservation);
+  }
+  scan_node_->runtime_profile()->GetSummaryStatsCounter(ACTUAL_RESERVATION_COUNTER_NAME)->
+      UpdateCounter(context_->total_reservation());
+  scan_node_->runtime_profile()->GetSummaryStatsCounter(IDEAL_RESERVATION_COUNTER_NAME)->
+      UpdateCounter(ideal_reservation);
+
   vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
-      min_buffer_size, max_buffer_size, col_range_lengths, reservation_to_distribute);
+      min_buffer_size, max_buffer_size, col_range_lengths, context_->total_reservation());
   for (auto& tmp_reservation : tmp_reservations) {
     column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
   }
   return Status::OK();
 }
 
+int64_t HdfsParquetScanner::ComputeIdealReservation(
+    const vector<int64_t>& col_range_lengths) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  int64_t ideal_reservation = 0;
+  for (int64_t len : col_range_lengths) {
+    ideal_reservation += io_mgr->ComputeIdealBufferReservation(len);
+  }
+  return ideal_reservation;
+}
+
 vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
     int64_t min_buffer_size, int64_t max_buffer_size,
     const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) {

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 69749f8..82e761a 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -407,9 +407,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_;
 
-  /// Reservation available for scanning columns, in bytes.
-  int64_t total_col_reservation_ = 0;
-
   /// Pool to copy dictionary page buffer into. This pool is shared across all the
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -573,13 +570,19 @@ class HdfsParquetScanner : public HdfsScanner {
   /// does not start any scan ranges.
   Status InitScalarColumns() WARN_UNUSED_RESULT;
 
-  /// Decides how to divide 'reservation_to_distribute' bytes of reservation between the
-  /// columns. Sets the reservation on each corresponding reader in 'column_readers'.
+  /// Decides how to divide stream_->reservation() between the columns. May increase
+  /// the reservation if more reservation would enable more efficient I/O for the
+  /// current columns being scanned. Sets the reservation on each corresponding reader
+  /// in 'column_readers'.
   Status DivideReservationBetweenColumns(
-      const std::vector<BaseScalarColumnReader*>& column_readers,
-      int64_t reservation_to_distribute);
+      const std::vector<BaseScalarColumnReader*>& column_readers);
+
+  /// Compute the ideal reservation to scan a file with scan range lengths
+  /// 'col_range_lengths' given the min and max buffer size of the singleton DiskIoMgr
+  /// in ExecEnv.
+  static int64_t ComputeIdealReservation(const std::vector<int64_t>& col_range_lengths);
 
-  /// Helper for DivideReservationBetweenColumns. Implements the core algorithm for
+  /// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for
   /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
   /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
   /// a vector with an entry per column with the index into 'col_range_lengths' and the

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index dc88a87..a40323b 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -66,7 +66,6 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
-      ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation),
       min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
           tnode.hdfs_scan_node.min_max_tuple_id : -1),
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
@@ -83,7 +82,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
           &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
       disks_accessed_bitmap_(TUnit::UNIT, 0),
       active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
-  DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation);
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -340,16 +338,6 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   }
 
   RETURN_IF_ERROR(ClaimBufferReservation(state));
-  // We got the minimum reservation. Now try to get ideal reservation.
-  if (resource_profile_.min_reservation != ideal_scan_range_reservation_) {
-    bool increased = buffer_pool_client_.IncreaseReservation(
-        ideal_scan_range_reservation_ - resource_profile_.min_reservation);
-    VLOG_FILE << "Increasing reservation from minimum "
-              << resource_profile_.min_reservation << "B to ideal "
-              << ideal_scan_range_reservation_ << "B "
-              << (increased ? "succeeded" : "failed");
-  }
-
   reader_context_ = runtime_state_->io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
@@ -382,6 +370,11 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
       AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
 
+  initial_range_ideal_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
+      "InitialRangeIdealReservation", TUnit::BYTES);
+  initial_range_actual_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
+      "InitialRangeActualReservation", TUnit::BYTES);
+
   bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
       TUnit::BYTES);
   bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
@@ -488,6 +481,7 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
         break;
       case THdfsFileFormat::ORC:
         RETURN_IF_ERROR(HdfsOrcScanner::IssueInitialRanges(this, entry.second));
+        break;
       default:
         DCHECK(false) << "Unexpected file type " << entry.first;
     }
@@ -512,6 +506,51 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
   return true;
 }
 
+Status HdfsScanNodeBase::StartNextScanRange(int64_t* reservation,
+    ScanRange** scan_range) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  bool needs_buffers;
+  RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
+        reader_context_.get(), scan_range, &needs_buffers));
+  if (*scan_range == nullptr) return Status::OK();
+  if (needs_buffers) {
+    // Check if we should increase our reservation to read this range more efficiently.
+    // E.g. if we are scanning a large text file, we might want extra I/O buffers to
+    // improve throughput. Note that if this is a columnar format like Parquet,
+    // '*scan_range' is the small footer range only so we won't request an increase.
+    int64_t ideal_scan_range_reservation =
+        io_mgr->ComputeIdealBufferReservation((*scan_range)->len());
+    *reservation = IncreaseReservationIncrementally(*reservation, ideal_scan_range_reservation);
+    initial_range_ideal_reservation_stats_->UpdateCounter(ideal_scan_range_reservation);
+    initial_range_actual_reservation_stats_->UpdateCounter(*reservation);
+    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+        reader_context_.get(), &buffer_pool_client_, *scan_range, *reservation));
+  }
+  return Status::OK();
+}
+
+int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservation,
+      int64_t ideal_reservation) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  // Check if we could use at least one more max-sized I/O buffer for this range. Don't
+  // increase in smaller increments since we may not be able to use additional smaller
+  // buffers.
+  while (curr_reservation < ideal_reservation) {
+    // Increase to the next I/O buffer multiple or to the ideal reservation.
+    int64_t target = min(ideal_reservation,
+        BitUtil::RoundUpToPowerOf2(curr_reservation + 1, io_mgr->max_buffer_size()));
+    DCHECK_LT(curr_reservation, target);
+    bool increased = buffer_pool_client_.IncreaseReservation(target - curr_reservation);
+    VLOG_FILE << "Increasing reservation from "
+              << PrettyPrinter::PrintBytes(curr_reservation) << " to "
+              << PrettyPrinter::PrintBytes(target) << " "
+              << (increased ? "succeeded" : "failed");
+    if (!increased) break;
+    curr_reservation = target;
+  }
+  return curr_reservation;
+}
+
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
     const BufferOpts& buffer_opts,

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3a9c37f..4c0a233 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -318,13 +318,19 @@ class HdfsScanNodeBase : public ScanNode {
   bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name,
       const std::vector<FilterContext>& filter_ctxs);
 
+  /// Helper to increase reservation from 'curr_reservation' up to 'ideal_reservation'
+  /// that may succeed in getting a partial increase if the full increase is not
+  /// possible. First increases to an I/O buffer multiple then increases in I/O buffer
+  /// sized increments. 'curr_reservation' can refer to a "share' of the total
+  /// reservation of the buffer pool client, e.g. the 'share" belonging to a single
+  /// scanner thread. Returns the new reservation after increases.
+  int64_t IncreaseReservationIncrementally(int64_t curr_reservation,
+      int64_t ideal_reservation);
+
  protected:
   friend class ScannerContext;
   friend class HdfsScanner;
 
-  /// Ideal reservation to process each input split, computed by the planner.
-  const int64_t ideal_scan_range_reservation_;
-
   /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
   const int min_max_tuple_id_;
 
@@ -481,6 +487,13 @@ class HdfsScanNodeBase : public ScanNode {
   /// taken where there are i concurrent hdfs read thread running. Created in Open().
   std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
 
+  /// Track stats about ideal/actual reservation for initial scan ranges so we can
+  /// determine if the scan got all of the reservation it wanted. Does not include
+  /// subsequent reservation increases done by scanner implementation (e.g. for Parquet
+  /// columns).
+  RuntimeProfile::SummaryStatsCounter* initial_range_ideal_reservation_stats_ = nullptr;
+  RuntimeProfile::SummaryStatsCounter* initial_range_actual_reservation_stats_ = nullptr;
+
   /// Pool for allocating some amounts of memory that is shared between scanners.
   /// e.g. partition key tuple and their string buffers
   boost::scoped_ptr<MemPool> scan_node_pool_;
@@ -495,6 +508,17 @@ class HdfsScanNodeBase : public ScanNode {
   /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
   Status IssueInitialScanRanges(RuntimeState* state) WARN_UNUSED_RESULT;
 
+  /// Gets the next scan range to process and allocates buffer for it. 'reservation' is
+  /// an in/out argument with the current reservation available for this range. It may
+  /// be increased by this function up to a computed "ideal" reservation, in which case
+  /// *reservation is increased to reflect the new reservation.
+  ///
+  /// Returns Status::OK() and sets 'scan_range' if it gets a range to process. Returns
+  /// Status::OK() and sets 'scan_range' to NULL when no more ranges are left to process.
+  /// Returns an error status if there was an error getting the range or allocating
+  /// buffers.
+  Status StartNextScanRange(int64_t* reservation, io::ScanRange** scan_range);
+
   /// Create and open new scanner for this partition type.
   /// If the scanner is successfully created and opened, it is returned in 'scanner'.
   Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 2786742..4573978 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -78,26 +78,19 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
       scanner_->Close(row_batch);
       scanner_.reset();
     }
-    DiskIoMgr* io_mgr = runtime_state_->io_mgr();
-    bool needs_buffers;
-    RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
-        reader_context_.get(), &scan_range_, &needs_buffers));
+    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
+    RETURN_IF_ERROR(StartNextScanRange(&scanner_reservation, &scan_range_));
     if (scan_range_ == nullptr) {
       *eos = true;
       StopAndFinalizeCounters();
       return Status::OK();
     }
-    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
-    if (needs_buffers) {
-      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(),
-          &buffer_pool_client_, scan_range_, scanner_reservation));
-    }
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
     scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_,
-        partition, filter_ctxs(), expr_results_pool()));
+        scanner_reservation, partition, filter_ctxs(), expr_results_pool()));
     scanner_ctx_->AddStream(scan_range_, scanner_reservation);
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
     if (!status.ok()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index e8dac08..81b834f 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -156,6 +156,8 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
   row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
   row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
+  scanner_thread_reservations_denied_counter_ =
+      ADD_COUNTER(runtime_profile(), "NumScannerThreadReservationsDenied", TUnit::UNIT);
   return Status::OK();
 }
 
@@ -172,7 +174,6 @@ Status HdfsScanNode::Open(RuntimeState* state) {
     max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
   }
   DCHECK_GT(max_num_scanner_threads_, 0);
-  spare_reservation_.Store(buffer_pool_client_.GetReservation());
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
   return Status::OK();
@@ -215,28 +216,19 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
   return Status::OK();
 }
 
-bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& lock) {
+void HdfsScanNode::ReturnReservationFromScannerThread(const unique_lock<mutex>& lock,
+    int64_t bytes) {
   DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true;
-  int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load();
-  if (!buffer_pool_client_.IncreaseReservation(increase)) return false;
-  spare_reservation_.Add(increase);
-  return true;
-}
-
-int64_t HdfsScanNode::DeductReservationForScannerThread(const unique_lock<mutex>& lock,
-    bool first_thread) {
-  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  int64_t amount;
-  if (first_thread) {
-    amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ?
-        ideal_scan_range_reservation_ : resource_profile_.min_reservation;
-  } else {
-    amount = ideal_scan_range_reservation_;
+  int64_t curr_reservation = buffer_pool_client_.GetReservation();
+  DCHECK_GE(curr_reservation, resource_profile_.min_reservation);
+  // Release as much memory as possible. Must hold onto the minimum reservation, though.
+  int64_t reservation_decrease =
+      min(bytes, curr_reservation - resource_profile_.min_reservation);
+  if (reservation_decrease > 0) {
+    Status status =
+        buffer_pool_client_.DecreaseReservationTo(curr_reservation - reservation_decrease);
+    DCHECK(status.ok()) << "Not possible, scans don't unpin pages" << status.GetDetail();
   }
-  int64_t remainder = spare_reservation_.Add(-amount);
-  DCHECK_GE(remainder, 0);
-  return amount;
 }
 
 void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
@@ -250,7 +242,8 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
   //  4. Don't start up if no initial ranges have been issued (see IMPALA-1722).
   //  5. Don't start up a ScannerThread if materialized_row_batches_ is full since
   //     we are not scanner bound.
-  //  6. Don't start up a thread if there isn't enough memory left to run it.
+  //  6. Don't start up a thread if it is an extra thread and we can't reserve another
+  //     minimum reservation's worth of memory for the thread.
   //  7. Don't start up more than maximum number of scanner threads configured.
   //  8. Don't start up if there are no thread tokens.
 
@@ -269,17 +262,21 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
     const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value();
     const bool first_thread = num_active_scanner_threads == 0;
+    const int64_t scanner_thread_reservation = resource_profile_.min_reservation;
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
         num_active_scanner_threads >= progress_.remaining()) {
       break;
     }
 
-    // Cases 5 and 6.
-    if (!first_thread &&
-        (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
-         !EnoughReservationForExtraThread(lock))) {
-      break;
+    if (!first_thread) {
+      // Cases 5 and 6.
+      if (materialized_row_batches_->Size() >= max_materialized_row_batches_) break;
+      // The node's min reservation is for the first thread so we don't need to check
+      if (!buffer_pool_client_.IncreaseReservation(scanner_thread_reservation)) {
+        COUNTER_ADD(scanner_thread_reservations_denied_counter_, 1);
+        break;
+      }
     }
 
     // Case 7 and 8.
@@ -288,14 +285,10 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
       pool->AcquireThreadToken();
     } else if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_
         || !pool->TryAcquireThreadToken()) {
+      ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
       break;
     }
 
-    // Deduct the reservation. We haven't dropped the lock since the
-    // first_thread/EnoughReservationForExtraThread() checks so spare reservation
-    // must be available.
-    int64_t scanner_thread_reservation =
-        DeductReservationForScannerThread(lock, first_thread);
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
@@ -307,7 +300,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
     if (!status.ok()) {
-      ReturnReservationFromScannerThread(scanner_thread_reservation);
+      ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
       COUNTER_ADD(&active_scanner_thread_counter_, -1);
       // Release the token and skip running callbacks to find a replacement. Skipping
       // serves two purposes. First, it prevents a mutual recursion between this function
@@ -331,8 +324,6 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reservation) {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
-  DiskIoMgr* io_mgr = runtime_state_->io_mgr();
-
   // Make thread-local copy of filter contexts to prune scan ranges, and to pass to the
   // scanner for finer-grained filtering. Use a thread-local MemPool for the filter
   // contexts as the embedded expression evaluators may allocate from it and MemPool
@@ -377,29 +368,19 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
     // to return if there's an error.
     ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
 
-    // Take a snapshot of num_unqueued_files_ before calling GetNextUnstartedRange().
+    // Take a snapshot of num_unqueued_files_ before calling StartNextScanRange().
     // We don't want num_unqueued_files_ to go to zero between the return from
-    // GetNextUnstartedRange() and the check for when all ranges are complete.
+    // StartNextScanRange() and the check for when all ranges are complete.
     int num_unqueued_files = num_unqueued_files_.Load();
     // TODO: the Load() acts as an acquire barrier.  Is this needed? (i.e. any earlier
     // stores that need to complete?)
     AtomicUtil::MemoryBarrier();
     ScanRange* scan_range;
-    bool needs_buffers;
-    Status status =
-        io_mgr->GetNextUnstartedRange(reader_context_.get(), &scan_range, &needs_buffers);
-
+    Status status = StartNextScanRange(&scanner_thread_reservation, &scan_range);
     if (status.ok() && scan_range != nullptr) {
-      if (needs_buffers) {
-        status = io_mgr->AllocateBuffersForRange(
-            reader_context_.get(), &buffer_pool_client_, scan_range,
-            scanner_thread_reservation);
-      }
-      if (status.ok()) {
-        // Got a scan range. Process the range end to end (in this thread).
-        status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
-            &expr_results_pool, scan_range, scanner_thread_reservation);
-      }
+      // Got a scan range. Process the range end to end (in this thread).
+      status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
+          &expr_results_pool, scan_range, &scanner_thread_reservation);
     }
 
     if (!status.ok()) {
@@ -429,8 +410,9 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
       // TODO: Based on the usage pattern of all_ranges_started_, it looks like it is not
       // needed to acquire the lock in x86.
       unique_lock<mutex> l(lock_);
-      // All ranges have been queued and GetNextUnstartedRange() returned NULL. This means
-      // that every range is either done or being processed by another thread.
+      // All ranges have been queued and DiskIoMgr has no more new ranges for this scan
+      // node to process. This means that every range is either done or being processed by
+      // another thread.
       all_ranges_started_ = true;
       break;
     }
@@ -438,7 +420,10 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
 exit:
-  ReturnReservationFromScannerThread(scanner_thread_reservation);
+  {
+    unique_lock<mutex> l(lock_);
+    ReturnReservationFromScannerThread(l, scanner_thread_reservation);
+  }
   runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
@@ -447,7 +432,7 @@ exit:
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool, ScanRange* scan_range,
-    int64_t scanner_thread_reservation) {
+    int64_t* scanner_thread_reservation) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -472,9 +457,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     return Status::OK();
   }
 
-  ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition,
-      filter_ctxs, expr_results_pool);
-  context.AddStream(scan_range, scanner_thread_reservation);
+  ScannerContext context(runtime_state_, this, &buffer_pool_client_,
+      *scanner_thread_reservation, partition, filter_ctxs, expr_results_pool);
+  context.AddStream(scan_range, *scanner_thread_reservation);
   scoped_ptr<HdfsScanner> scanner;
   Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {
@@ -510,6 +495,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
 
   // Transfer remaining resources to a final batch and add it to the row batch queue.
   scanner->Close();
+  // Reservation may have been increased by the scanner, e.g. Parquet may allocate
+  // additional reservation to scan columns.
+  *scanner_thread_reservation = context.total_reservation();
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 2dfbb10..48684bd 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -64,9 +64,11 @@ class TPlanNode;
 /// ------------------
 /// The different scanner threads all allocate I/O buffers from the node's Buffer Pool
 /// client. The scan node ensures that enough reservation is available to start a
-/// scanner thread before launching each one with (see
-/// EnoughReservationForExtraThread()), after which the scanner thread is responsible
-/// for staying within the reservation handed off to it.
+/// scanner thread before launching each one with, after which the scanner thread must
+/// stay within the reservation handed off to it. Scanner threads can try to increase
+/// their reservation if desired (e.g. for scanning columnar formats like Parquet), but
+/// must be able to make progress within the initial reservation handed off from the scan
+/// node.
 ///
 /// TODO: Remove this class once the fragment-based multi-threaded execution is
 /// fully functional.
@@ -150,19 +152,15 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// the number of cores.
   int max_num_scanner_threads_;
 
-  /// Amount of the 'buffer_pool_client_' reservation that is not allocated to scanner
-  /// threads. Doled out to scanner threads when they are started and returned when
-  /// those threads no longer need it. Can be atomically incremented without holding
-  /// 'lock_' but 'lock_' is held when decrementing to ensure that the check for
-  /// reservation and the actual deduction are atomic with respect to other threads
-  /// trying to claim reservation.
-  AtomicInt64 spare_reservation_{0};
+  // Number of times scanner threads were not created because of reservation increase
+  // being denied.
+  RuntimeProfile::Counter* scanner_thread_reservations_denied_counter_ = nullptr;
 
   /// The wait time for fetching a row batch from the row batch queue.
-  RuntimeProfile::Counter* row_batches_get_timer_;
+  RuntimeProfile::Counter* row_batches_get_timer_ = nullptr;
 
   /// The wait time for enqueuing a row batch into the row batch queue.
-  RuntimeProfile::Counter* row_batches_put_timer_;
+  RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;
 
   /// Tries to spin up as many scanner threads as the quota allows. Called explicitly
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
@@ -174,34 +172,24 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// 'first_thread' is true if this was the first scanner thread to start and
   /// it acquired a "required" thread token from ThreadResourceMgr.
   /// The caller must have reserved 'scanner_thread_reservation' bytes of memory for
-  /// this thread with DeductReservationForScannerThread().
+  /// this thread. Before returning, this function releases the reservation with
+  /// ReturnReservationFromScannerThread().
   void ScannerThread(bool first_thread, int64_t scanner_thread_reservation);
 
   /// Process the entire scan range with a new scanner object. Executed in scanner
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
-  /// in this split.
+  /// in this split. 'scanner_thread_reservation' is an in/out argument that tracks the
+  /// total reservation from 'buffer_pool_client_' that is allotted for this thread's
+  /// use.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool, io::ScanRange* scan_range,
-      int64_t scanner_thread_reservation) WARN_UNUSED_RESULT;
-
-  /// Return true if there is enough reservation to start an extra scanner thread.
-  /// Tries to increase reservation if enough is not already available in
-  /// 'spare_reservation_'. 'lock_' must be held via 'lock'
-  bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& lock);
-
-  /// Deduct reservation to start a new scanner thread from 'spare_reservation_'. If
-  /// 'first_thread' is true, this is the first thread to be started and only the
-  /// minimum reservation is required to be available. Otherwise
-  /// EnoughReservationForExtra() thread must have returned true in the current
-  /// critical section so that 'ideal_scan_range_bytes_' is available for the extra
-  /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'.
-  int64_t DeductReservationForScannerThread(const boost::unique_lock<boost::mutex>& lock,
-      bool first_thread);
-
-  /// Called by scanner thread to return or all of its reservation that is not needed.
-  void ReturnReservationFromScannerThread(int64_t bytes) {
-    spare_reservation_.Add(bytes);
-  }
+      int64_t* scanner_thread_reservation) WARN_UNUSED_RESULT;
+
+  /// Called by scanner thread to return some or all of its reservation that is not
+  /// needed. Always holds onto at least the minimum reservation to avoid violating the
+  /// invariants of ExecNode::buffer_pool_client_. 'lock_' must be held via 'lock'.
+  void ReturnReservationFromScannerThread(const boost::unique_lock<boost::mutex>& lock,
+      int64_t bytes);
 
   /// Checks for eos conditions and returns batches from materialized_row_batches_.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index c669e65..280eac9 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,12 +41,14 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
-    BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc,
+    BufferPool::ClientHandle* bp_client, int64_t total_reservation,
+    HdfsPartitionDescriptor* partition_desc,
     const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool)
   : state_(state),
     scan_node_(scan_node),
     bp_client_(bp_client),
+    total_reservation_(total_reservation),
     partition_desc_(partition_desc),
     filter_ctxs_(filter_ctxs),
     expr_results_pool_(expr_results_pool) {
@@ -56,6 +58,11 @@ ScannerContext::~ScannerContext() {
   DCHECK(streams_.empty());
 }
 
+void ScannerContext::TryIncreaseReservation(int64_t ideal_reservation) {
+  total_reservation_ = scan_node_->IncreaseReservationIncrementally(
+      total_reservation_, ideal_reservation);
+}
+
 void ScannerContext::ReleaseCompletedResources(bool done) {
   for (int i = 0; i < streams_.size(); ++i) {
     streams_[i]->ReleaseCompletedResources(done);

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 6292486..0bb8d74 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -86,9 +86,10 @@ class ScannerContext {
  public:
   /// Create a scanner context with the parent scan_node (where materialized row batches
   /// get pushed to) and the scan range to process. Buffers are allocated using
-  /// 'bp_client'.
+  /// 'bp_client'. 'total_reservation' bytes of 'bp_client''s reservation has been
+  /// initally allotted for use by this scanner.
   ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
-      BufferPool::ClientHandle* bp_client,
+      BufferPool::ClientHandle* bp_client, int64_t total_reservation,
       HdfsPartitionDescriptor* partition_desc,
       const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
@@ -350,6 +351,13 @@ class ScannerContext {
 
   int NumStreams() const { return streams_.size(); }
 
+  /// Tries to increase 'total_reservation()' to 'ideal_reservation'. May get
+  /// none, part or all of the requested increase. total_reservation() can be
+  /// checked by the caller to find out the new total reservation. When this
+  /// ScannerContext is destroyed, the scan node takes back ownership of
+  /// total_reservation().
+  void TryIncreaseReservation(int64_t ideal_reservation);
+
   /// Release completed resources for all streams, e.g. the last buffer in each stream if
   /// the current read position is at the end of the buffer. If 'done' is true all
   /// resources are freed, even if the caller has not read that data yet. After calling
@@ -370,8 +378,9 @@ class ScannerContext {
   /// buffers that it needs allocated. 'reservation' is the amount of reservation that
   /// is given to this stream for allocating I/O buffers. The reservation is shared with
   /// 'range', so the context must be careful not to use this until all of 'range's
-  /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading
-  /// past the end of 'range'.
+  /// buffers have been freed. Must be >= the minimum IoMgr buffer size to allow reading
+  /// past the end of 'range'. 'reservation' must be <=
+  /// ScannerContext::total_reservation(), i.e. this reservation is included in the total.
   ///
   /// Returns the added stream. The returned stream is owned by this context.
   Stream* AddStream(io::ScanRange* range, int64_t reservation);
@@ -382,6 +391,7 @@ class ScannerContext {
   bool cancelled() const;
 
   BufferPool::ClientHandle* bp_client() const { return bp_client_; }
+  int64_t total_reservation() const { return total_reservation_; }
   HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; }
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
   MemPool* expr_results_pool() const { return expr_results_pool_; }
@@ -396,6 +406,11 @@ class ScannerContext {
   /// call thread-safe BufferPool methods with this client.
   BufferPool::ClientHandle* const bp_client_;
 
+  /// Total reservation from 'bp_client_' that this scanner is allowed to use.
+  /// TODO: when we remove the multi-threaded scan node, we may be able to just use
+  /// bp_client_->Reservation()
+  int64_t total_reservation_;
+
   HdfsPartitionDescriptor* const partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per context.

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 50f6fc4..170b47e 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -505,6 +505,20 @@ vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t max
   return buffer_sizes;
 }
 
+int64_t DiskIoMgr::ComputeIdealBufferReservation(int64_t scan_range_len) {
+  if (scan_range_len < max_buffer_size_) {
+    // Round up to nearest power-of-two buffer size - ideally we should do a single read
+    // I/O for this range.
+    return max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(scan_range_len));
+  } else {
+    // Round up to the nearest max-sized I/O buffer, capped by
+    // IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE - we should do one or more max-sized read
+    // I/Os for this range.
+    return min(IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size_,
+            BitUtil::RoundUpToPowerOf2(scan_range_len, max_buffer_size_));
+  }
+}
+
 // This function gets the next RequestRange to work on for this disk. It checks for
 // cancellation and
 // a) Updates ready_to_start_ranges if there are no scan ranges queued for this disk.

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index b6b4b75..cc1bb37 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -386,6 +386,10 @@ class DiskIoMgr : public CacheLineAligned {
     REMOTE_NUM_DISKS
   };
 
+  /// Compute the ideal reservation for processing a scan range of 'scan_range_len' bytes.
+  /// See "Buffer Management" in the class comment for explanation.
+  int64_t ComputeIdealBufferReservation(int64_t scan_range_len);
+
   /// The ideal number of max-sized buffers per scan range to maximise throughput.
   /// See "Buffer Management" in the class comment for explanation.
   static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3;

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 5f205a7..c7d50de 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -621,6 +621,15 @@ RuntimeProfile::Counter* RuntimeProfile::GetCounter(const string& name) {
   return NULL;
 }
 
+RuntimeProfile::SummaryStatsCounter* RuntimeProfile::GetSummaryStatsCounter(
+    const string& name) {
+  lock_guard<SpinLock> l(summary_stats_map_lock_);
+  if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
+    return summary_stats_map_[name];
+  }
+  return nullptr;
+}
+
 void RuntimeProfile::GetCounters(const string& name, vector<Counter*>* counters) {
   Counter* c = GetCounter(name);
   if (c != NULL) counters->push_back(c);

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 320edf3..a7f9a95 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -201,6 +201,10 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// that name.
   Counter* GetCounter(const std::string& name);
 
+  /// Gets the summary stats counter with 'name'. Returns NULL if there is no summary
+  /// stats counter with that name.
+  SummaryStatsCounter* GetSummaryStatsCounter(const std::string& name);
+
   /// Adds all counters with 'name' that are registered either in this or
   /// in any of the child profiles to 'counters'.
   void GetCounters(const std::string& name, std::vector<Counter*>* counters);

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 22198aa..01698ce 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -240,9 +240,6 @@ struct THdfsScanNode {
   // The byte offset of the slot for Parquet metadata if Parquet count star optimization
   // is enabled.
   10: optional i32 parquet_count_star_slot_offset
-
-  // The ideal memory reservation in bytes to process an input split.
-  11: optional i64 ideal_scan_range_reservation
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
index dfaaf66..c798d96 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
@@ -163,6 +163,7 @@ public class ColumnStats {
   public long getMaxSize() { return maxSize_; }
   public boolean hasNulls() { return numNulls_ > 0; }
   public long getNumNulls() { return numNulls_; }
+  public boolean hasAvgSize() { return avgSize_ >= 0; }
   public boolean hasAvgSerializedSize() { return avgSerializedSize_ >= 0; }
   public boolean hasNumDistinctValues() { return numDistinctValues_ >= 0; }
   public boolean hasStats() { return numNulls_ != -1 || numDistinctValues_ != -1; }

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 2179346..e0850c6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -194,6 +194,15 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     public String getFileName() { return fbFileDescriptor_.fileName(); }
     public long getFileLength() { return fbFileDescriptor_.length(); }
 
+    /** Compute the total length of files in fileDescs */
+    public static long computeTotalFileLength(Collection<FileDescriptor> fileDescs) {
+      long totalLength = 0;
+      for (FileDescriptor fileDesc: fileDescs) {
+        totalLength += fileDesc.getFileLength();
+      }
+      return totalLength;
+    }
+
     public HdfsCompression getFileCompression() {
       return HdfsCompression.valueOf(FbCompression.name(fbFileDescriptor_.compression()));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index a1f47aa..398393b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -48,6 +48,7 @@ import org.apache.impala.analysis.TableSampleClause;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
@@ -59,7 +60,6 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
-import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.fb.FbFileBlock;
@@ -147,6 +147,16 @@ public class HdfsScanNode extends ScanNode {
   // between 128kb and 1.1mb.
   private static final long MIN_MEMORY_ESTIMATE = 1L * 1024L * 1024L;
 
+  // Default reservation in bytes for a IoMgr scan range for a column in columnar
+  // formats like Parquet. Chosen to allow reasonably efficient I/O for all columns
+  // even with only the minimum reservation, but not to use excessive memory for columns
+  // where we overestimate the size.
+  // TODO: is it worth making this a tunable query option?
+  private static final long DEFAULT_COLUMN_SCAN_RANGE_RESERVATION = 4L * 1024L * 1024L;
+
+  // Read size for Parquet and ORC footers. Matches HdfsScanner::FOOTER_SIZE in backend.
+  private static final long FOOTER_SIZE = 100L * 1024L;
+
   private final HdfsTable tbl_;
 
   // List of partitions to be scanned. Partitions have been pruned.
@@ -173,12 +183,7 @@ public class HdfsScanNode extends ScanNode {
 
   // Number of bytes in the largest scan range (i.e. hdfs split). Set in
   // computeScanRangeLocations().
-  private long maxScanRangeBytes_ = 0;
-
-  // The ideal reservation to process a single scan range (i.e. hdfs split), >= the
-  // minimum reservation. Generally provides enough memory to overlap CPU and I/O and
-  // maximize throughput. Set in computeResourceProfile().
-  private long idealScanRangeReservation_ = -1;
+  private long largestScanRangeBytes_ = 0;
 
   // Input cardinality based on the partition row counts or extrapolation. -1 if invalid.
   // Both values can be valid to report them in the explain plan, but only one of them is
@@ -186,6 +191,10 @@ public class HdfsScanNode extends ScanNode {
   private long partitionNumRows_ = -1;
   private long extrapolatedNumRows_ = -1;
 
+  // Estimated row count of the largest scan range. -1 if no stats are available.
+  // Set in computeScanRangeLocations()
+  private long maxScanRangeNumRows_ = -1;
+
   // True if this scan node should use the MT implementation in the backend.
   private boolean useMtScanNode_;
 
@@ -735,13 +744,14 @@ public class HdfsScanNode extends ScanNode {
       sampledFiles = tbl_.getFilesSample(partitions_, percentBytes, 0, randomSeed);
     }
 
-    long maxScanRangeLength = analyzer.getQueryCtx().client_request.getQuery_options()
+    long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()
         .getMax_scan_range_length();
     scanRanges_ = Lists.newArrayList();
     numPartitions_ = (sampledFiles != null) ? sampledFiles.size() : partitions_.size();
     totalFiles_ = 0;
     totalBytes_ = 0;
-    maxScanRangeBytes_ = 0;
+    largestScanRangeBytes_ = 0;
+    maxScanRangeNumRows_ = -1;
     fileFormats_ = Sets.newHashSet();
     for (HdfsPartition partition: partitions_) {
       List<FileDescriptor> fileDescs = partition.getFileDescriptors();
@@ -750,6 +760,7 @@ public class HdfsScanNode extends ScanNode {
         fileDescs = sampledFiles.get(Long.valueOf(partition.getId()));
         if (fileDescs == null) continue;
       }
+      long partitionNumRows = partition.getNumRows();
 
       analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
       fileFormats_.add(partition.getFileFormat());
@@ -764,10 +775,11 @@ public class HdfsScanNode extends ScanNode {
       }
       boolean checkMissingDiskIds = FileSystemUtil.supportsStorageIds(partitionFs);
       boolean partitionMissingDiskIds = false;
-
+      final long partitionBytes = FileDescriptor.computeTotalFileLength(fileDescs);
+      long partitionMaxScanRangeBytes = 0;
+      totalBytes_ += partitionBytes;
       totalFiles_ += fileDescs.size();
       for (FileDescriptor fileDesc: fileDescs) {
-        totalBytes_ += fileDesc.getFileLength();
         boolean fileDescMissingDiskIds = false;
         for (int j = 0; j < fileDesc.getNumFileBlocks(); ++j) {
           FbFileBlock block = fileDesc.getFbFileBlock(j);
@@ -803,8 +815,8 @@ public class HdfsScanNode extends ScanNode {
           long remainingLength = FileBlock.getLength(block);
           while (remainingLength > 0) {
             long currentLength = remainingLength;
-            if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) {
-              currentLength = maxScanRangeLength;
+            if (scanRangeBytesLimit > 0 && remainingLength > scanRangeBytesLimit) {
+              currentLength = scanRangeBytesLimit;
             }
             TScanRange scanRange = new TScanRange();
             scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getFileName(),
@@ -815,7 +827,9 @@ public class HdfsScanNode extends ScanNode {
             scanRangeLocations.scan_range = scanRange;
             scanRangeLocations.locations = locations;
             scanRanges_.add(scanRangeLocations);
-            maxScanRangeBytes_ = Math.max(maxScanRangeBytes_, currentLength);
+            largestScanRangeBytes_ = Math.max(largestScanRangeBytes_, currentLength);
+            partitionMaxScanRangeBytes =
+                Math.max(partitionMaxScanRangeBytes, currentLength);
             remainingLength -= currentLength;
             currentOffset += currentLength;
           }
@@ -829,59 +843,41 @@ public class HdfsScanNode extends ScanNode {
         }
       }
       if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_;
+      if (partitionMaxScanRangeBytes > 0 && partitionNumRows >= 0) {
+        updateMaxScanRangeNumRows(
+            partitionNumRows, partitionBytes, partitionMaxScanRangeBytes);
+      }
     }
-  }
-
-  /**
-   * Compute the number of columns that are read from the file, as opposed to
-   * materialised based on metadata. If there are nested collections, counts the
-   * number of leaf scalar slots per collection. This matches Parquet's "shredded"
-   * approach to nested collections, where each nested field is stored as a separate
-   * column. We may need to adjust this logic for non-shredded columnar formats if added.
-   */
-  private int computeNumColumnsReadFromFile() {
-    HdfsTable table = (HdfsTable) desc_.getTable();
-    int numColumns = 0;
-    boolean havePosSlot = false;
-    for (SlotDescriptor slot: desc_.getSlots()) {
-      if (!slot.isMaterialized() || slot == countStarSlot_) continue;
-      if (slot.getColumn() == null ||
-          slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
-        if (slot.isArrayPosRef()) {
-          // Position virtual slots can be materialized by piggybacking on another slot.
-          havePosSlot = true;
-        } else if (slot.getType().isScalarType()) {
-          ++numColumns;
-        } else {
-          numColumns += computeNumColumnsReadForCollection(slot);
-        }
+    if (totalFiles_ == 0) {
+      maxScanRangeNumRows_ = 0;
+    } else {
+      // Also estimate max rows per scan range based on table-level stats, in case some
+      // or all partition-level stats were missing.
+      long tableNumRows = tbl_.getNumRows();
+      if (tableNumRows >= 0) {
+        updateMaxScanRangeNumRows(tableNumRows, totalBytes_, largestScanRangeBytes_);
       }
     }
-    // Must scan something to materialize a position slot.
-    if (havePosSlot) numColumns = Math.max(numColumns, 1);
-    return numColumns;
   }
 
   /**
-   * Compute the number of columns read from disk for materialized scalar slots in
-   * the provided tuple.
+   * Update the estimate of maximum number of rows per scan range based on the fraction
+   * of bytes of the scan range relative to the total bytes per partition or table.
    */
-  private int computeNumColumnsReadForCollection(SlotDescriptor collectionSlot) {
-    Preconditions.checkState(collectionSlot.getType().isCollectionType());
-    int numColumns = 0;
-    for (SlotDescriptor nestedSlot: collectionSlot.getItemTupleDesc().getSlots()) {
-      // Position virtual slots can be materialized by piggybacking on another slot.
-      if (!nestedSlot.isMaterialized() || nestedSlot.isArrayPosRef()) continue;
-      if (nestedSlot.getType().isScalarType()) {
-        ++numColumns;
-      } else {
-        numColumns += computeNumColumnsReadForCollection(nestedSlot);
-      }
+  private void updateMaxScanRangeNumRows(long totalRows, long totalBytes,
+      long maxScanRangeBytes) {
+    Preconditions.checkState(totalRows >= 0);
+    Preconditions.checkState(totalBytes >= 0);
+    Preconditions.checkState(maxScanRangeBytes >= 0);
+    // Check for zeros first to avoid possibility of divide-by-zero below.
+    long estimate;
+    if (maxScanRangeBytes == 0 || totalBytes == 0 || totalRows == 0) {
+      estimate = 0;
+    } else {
+      double divisor = totalBytes / (double) maxScanRangeBytes;
+      estimate = (long)(totalRows / divisor);
     }
-    // Need to scan at least one column to materialize the pos virtual slot and/or
-    // determine the size of the nested array.
-    numColumns = Math.max(numColumns, 1);
-    return numColumns;
+    maxScanRangeNumRows_ =  Math.max(maxScanRangeNumRows_, estimate);
   }
 
   /**
@@ -1076,8 +1072,6 @@ public class HdfsScanNode extends ScanNode {
     }
     msg.hdfs_scan_node.setRandom_replica(randomReplica_);
     msg.node_type = TPlanNodeType.HDFS_SCAN_NODE;
-    Preconditions.checkState(idealScanRangeReservation_ >= 0, idealScanRangeReservation_);
-    msg.hdfs_scan_node.setIdeal_scan_range_reservation(idealScanRangeReservation_);
     if (!collectionConjuncts_.isEmpty()) {
       Map<Integer, List<TExpr>> tcollectionConjuncts = Maps.newLinkedHashMap();
       for (Map.Entry<TupleDescriptor, List<Expr>> entry:
@@ -1155,6 +1149,8 @@ public class HdfsScanNode extends ScanNode {
         extrapRows = "unavailable";
       }
       output.append(String.format("%sextrapolated-rows=%s", detailPrefix, extrapRows));
+      output.append(String.format(" max-scan-range-rows=%s",
+          maxScanRangeNumRows_ == -1 ? "unavailable" : maxScanRangeNumRows_));
       output.append("\n");
       if (numScanRangesNoDiskIds_ > 0) {
         output.append(String.format("%smissing disk ids: " +
@@ -1256,21 +1252,26 @@ public class HdfsScanNode extends ScanNode {
     Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan ranges.");
     if (scanRanges_.isEmpty()) {
       nodeResourceProfile_ = ResourceProfile.noReservation(0);
-      idealScanRangeReservation_ = 0;
       return;
     }
     Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size());
     Preconditions.checkNotNull(desc_);
     Preconditions.checkNotNull(desc_.getTable() instanceof HdfsTable);
     HdfsTable table = (HdfsTable) desc_.getTable();
-    int numColumnsReadFromFile = computeNumColumnsReadFromFile();
+    List<Long> columnReservations = null;
+    if (fileFormats_.contains(HdfsFileFormat.PARQUET)
+        || fileFormats_.contains(HdfsFileFormat.ORC)) {
+      columnReservations = computeMinColumnReservations();
+    }
+
     int perHostScanRanges;
     if (table.getMajorityFormat() == HdfsFileFormat.PARQUET
         || table.getMajorityFormat() == HdfsFileFormat.ORC) {
+      Preconditions.checkNotNull(columnReservations);
       // For the purpose of this estimation, the number of per-host scan ranges for
       // Parquet/ORC files are equal to the number of columns read from the file. I.e.
       // excluding partition columns and columns that are populated from file metadata.
-      perHostScanRanges = numColumnsReadFromFile;
+      perHostScanRanges = columnReservations.size();
     } else {
       perHostScanRanges = (int) Math.ceil((
           (double) scanRanges_.size() / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR);
@@ -1310,50 +1311,182 @@ public class HdfsScanNode extends ScanNode {
     }
     perInstanceMemEstimate = Math.max(perInstanceMemEstimate, MIN_MEMORY_ESTIMATE);
 
-    Pair<Long, Long> reservation = computeReservation(numColumnsReadFromFile);
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        .setMinReservationBytes(reservation.first).build();
-    idealScanRangeReservation_ = reservation.second;
+        .setMinReservationBytes(computeMinReservation(columnReservations)).build();
   }
 
-  /*
-   *  Compute the minimum and ideal memory reservation to process a single scan range
-   *  (i.e. hdfs split). Bound the reservation based on:
-   * - One minimum-sized buffer per IoMgr scan range, which is the absolute minimum
-   *   required to scan the data.
-   * - A maximum of either 1 or 3 max-sized I/O buffers per IoMgr scan range for
-   *   the minimum and ideal reservation respectively. 1 max-sized I/O buffer avoids
-   *   issuing small I/O unnecessarily while 3 max-sized I/O buffers guarantees higher
-   *   throughput by overlapping compute and I/O efficiently.
-   * - A maximum reservation of the hdfs split size, to avoid reserving excessive
-   *   memory for small files or ranges, e.g. small dimension tables with very few
-   *   rows.
+  /**
+   *  Compute the minimum reservation to process a single scan range (i.e. hdfs split).
+   *  We aim to choose a reservation that is as low as possible while still giving OK
+   *  performance when running with only the minimum reservation. The lower bound is one
+   *  minimum-sized buffer per IoMgr scan range - the absolute minimum required to scan
+   *  the data. The upper bounds are:
+   * - One max-sized I/O buffer per IoMgr scan range. One max-sized I/O buffer avoids
+   *   issuing small I/O unnecessarily. The backend can try to increase the reservation
+   *   further if more memory would speed up processing.
+   * - File format-specific calculations, e.g. based on estimated column sizes for
+   *   Parquet.
+   * - The hdfs split size, to avoid reserving excessive memory for small files or ranges,
+   *   e.g. small dimension tables with very few rows.
    */
-  private Pair<Long, Long> computeReservation(int numColumnsReadFromFile) {
-    Preconditions.checkState(maxScanRangeBytes_ >= 0);
+  private long computeMinReservation(List<Long> columnReservations) {
+    Preconditions.checkState(largestScanRangeBytes_ >= 0);
     long maxIoBufferSize =
         BitUtil.roundUpToPowerOf2(BackendConfig.INSTANCE.getReadSize());
-    // Scanners for columnar formats issue one IoMgr scan range for metadata, followed by
-    // one IoMgr scan range per column in parallel. Scanners for row-oriented formats
-    // issue only one IoMgr scan range at a time.
-    int iomgrScanRangesPerSplit = fileFormats_.contains(HdfsFileFormat.PARQUET) ?
-        Math.max(1, numColumnsReadFromFile) : 1;
-    // Need one buffer per IoMgr scan range to execute the scan.
-    long minReservationToExecute =
-        iomgrScanRangesPerSplit * BackendConfig.INSTANCE.getMinBufferSize();
-
-    // Quantize the max scan range (i.e. hdfs split) size to an I/O buffer size.
-    long quantizedMaxScanRangeBytes = maxScanRangeBytes_ < maxIoBufferSize ?
-        BitUtil.roundUpToPowerOf2(maxScanRangeBytes_) :
-        BitUtil.roundUpToPowerOf2Factor(maxScanRangeBytes_, maxIoBufferSize);
-    long minReservationBytes = Math.max(minReservationToExecute,
-        Math.min(iomgrScanRangesPerSplit * maxIoBufferSize,
-            quantizedMaxScanRangeBytes));
-    long idealReservationBytes = Math.max(minReservationToExecute,
-        Math.min(iomgrScanRangesPerSplit * maxIoBufferSize * 3,
-            quantizedMaxScanRangeBytes));
-    return Pair.create(minReservationBytes, idealReservationBytes);
+    long reservationBytes = 0;
+    for (HdfsFileFormat format: fileFormats_) {
+      long formatReservationBytes = 0;
+      // TODO: IMPALA-6875 - ORC should compute total reservation across columns once the
+      // ORC scanner supports reservations. For now it is treated the same as a
+      // row-oriented format because there is no per-column reservation.
+      if (format == HdfsFileFormat.PARQUET) {
+        // With Parquet, we first read the footer then all of the materialized columns in
+        // parallel.
+        for (long columnReservation : columnReservations) {
+          formatReservationBytes += columnReservation;
+        }
+        formatReservationBytes = Math.max(FOOTER_SIZE, formatReservationBytes);
+      } else {
+        // Scanners for row-oriented formats issue only one IoMgr scan range at a time.
+        // Minimum reservation is based on using I/O buffer per IoMgr scan range to get
+        // efficient large I/Os.
+        formatReservationBytes = maxIoBufferSize;
+      }
+      reservationBytes = Math.max(reservationBytes, formatReservationBytes);
+    }
+    reservationBytes = roundUpToIoBuffer(reservationBytes, maxIoBufferSize);
+
+    // Clamp the reservation we computed above to range:
+    // * minimum: <# concurrent io mgr ranges> * <min buffer size>, the absolute minimum
+    //   needed to execute the scan.
+    // * maximum: the maximum scan range (i.e. HDFS split size), rounded up to
+    //   the amount of buffers required to read it all at once.
+    int iomgrScanRangesPerSplit = columnReservations != null ?
+        Math.max(1, columnReservations.size()) : 1;
+    long maxReservationBytes = roundUpToIoBuffer(largestScanRangeBytes_, maxIoBufferSize);
+    return Math.max(iomgrScanRangesPerSplit * BackendConfig.INSTANCE.getMinBufferSize(),
+        Math.min(reservationBytes, maxReservationBytes));
+  }
+
+  /**
+   * Compute minimum memory reservations in bytes per column per scan range for each of
+   * the columns read from disk for a columnar format. Returns the raw estimate for
+   * each column, not quantized to a buffer size.
+
+   * If there are nested collections, returns a size for each of the leaf scalar slots
+   * per collection. This matches Parquet's "shredded" approach to nested collections,
+   * where each nested field is stored as a separate column. We may need to adjust this
+   * logic for nested types in non-shredded columnar formats (e.g. IMPALA-6503 - ORC)
+   * if/when that is added.
+   */
+  private List<Long> computeMinColumnReservations() {
+    List<Long> columnByteSizes = Lists.newArrayList();
+    HdfsTable table = (HdfsTable) desc_.getTable();
+    boolean havePosSlot = false;
+    for (SlotDescriptor slot: desc_.getSlots()) {
+      if (!slot.isMaterialized() || slot == countStarSlot_) continue;
+      if (slot.getColumn() == null ||
+          slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
+        if (slot.isArrayPosRef()) {
+          // Position virtual slots can be materialized by piggybacking on another slot.
+          havePosSlot = true;
+        } else if (slot.getType().isScalarType()) {
+          Column column = slot.getColumn();
+          if (column == null) {
+            // Not a top-level column, e.g. a value from a nested collection that is
+            // being unnested by the scanner. No stats are available for nested
+            // collections.
+            columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+          } else {
+            columnByteSizes.add(computeMinScalarColumnReservation(column));
+          }
+        } else {
+          appendMinColumnReservationsForCollection(slot, columnByteSizes);
+        }
+      }
+    }
+    if (havePosSlot && columnByteSizes.isEmpty()) {
+      // Must scan something to materialize a position slot. We don't know anything about
+      // the column that we're scanning so use the default reservation.
+      columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+    }
+    return columnByteSizes;
+  }
+
+  /**
+   * Helper for computeMinColumnReservations() - compute minimum memory reservations for
+   * all of the scalar columns read from disk when materializing collectionSlot. Appends
+   * one number per scalar column to columnByteSizes.
+   */
+  private void appendMinColumnReservationsForCollection(SlotDescriptor collectionSlot,
+      List<Long> columnByteSizes) {
+    Preconditions.checkState(collectionSlot.getType().isCollectionType());
+    boolean addedColumn = false;
+    for (SlotDescriptor nestedSlot: collectionSlot.getItemTupleDesc().getSlots()) {
+      // Position virtual slots can be materialized by piggybacking on another slot.
+      if (!nestedSlot.isMaterialized() || nestedSlot.isArrayPosRef()) continue;
+      if (nestedSlot.getType().isScalarType()) {
+        // No column stats are available for nested collections so use the default
+        // reservation.
+        columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+        addedColumn = true;
+      } else {
+        appendMinColumnReservationsForCollection(nestedSlot, columnByteSizes);
+      }
+    }
+    // Need to scan at least one column to materialize the pos virtual slot and/or
+    // determine the size of the nested array. Assume it is the size of a single I/O
+    // buffer.
+    if (!addedColumn) columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+  }
+
+  /**
+   * Choose the min bytes to reserve for this scalar column for a scan range. Returns the
+   * raw estimate without quantizing to buffer sizes - the caller should do so if needed.
+   *
+   * Starts with DEFAULT_COLUMN_SCAN_RANGE_RESERVATION and tries different strategies to
+   * infer that the column data is smaller than this starting value (and therefore the
+   * extra memory would not be useful). These estimates are quite conservative so this
+   * will still often overestimate the column size. An overestimate does not necessarily
+   * result in memory being wasted becase the Parquet scanner distributes the total
+   * reservation between columns based on actual column size, so if multiple columns are
+   * scanned, memory over-reserved for one column can be used to help scan a different
+   * larger column.
+   */
+  private long computeMinScalarColumnReservation(Column column) {
+    Preconditions.checkNotNull(column);
+    long reservationBytes = DEFAULT_COLUMN_SCAN_RANGE_RESERVATION;
+    ColumnStats stats = column.getStats();
+    if (stats.hasAvgSize() && maxScanRangeNumRows_ != -1) {
+      // Estimate the column's uncompressed data size based on row count and average
+      // size.
+      reservationBytes =
+          (long) Math.min(reservationBytes, stats.getAvgSize() * maxScanRangeNumRows_);
+      if (stats.hasNumDistinctValues()) {
+        // Estimate the data size with dictionary compression, assuming all distinct
+        // values occur in the scan range with the largest number of rows and that each
+        // value can be represented with approximately log2(ndv) bits. Even if Parquet
+        // dictionary compression does not kick in, general-purpose compression
+        // algorithms like Snappy can often find redundancy when there are repeated
+        // values.
+        long dictBytes = (long)(stats.getAvgSize() * stats.getNumDistinctValues());
+        long bitsPerVal = BitUtil.log2Ceiling(stats.getNumDistinctValues());
+        long encodedDataBytes = bitsPerVal * maxScanRangeNumRows_ / 8;
+        reservationBytes = Math.min(reservationBytes, dictBytes + encodedDataBytes);
+      }
+    }
+    return reservationBytes;
+  }
+
+  /**
+   * Calculate the total bytes of I/O buffers that would be allocated to hold bytes,
+   * given that buffers must be a power-of-two size <= maxIoBufferSize bytes.
+   */
+  private static long roundUpToIoBuffer(long bytes, long maxIoBufferSize) {
+    return bytes < maxIoBufferSize ?
+        BitUtil.roundUpToPowerOf2(bytes) :
+        BitUtil.roundUpToPowerOf2Factor(bytes, maxIoBufferSize);
   }
 
   /**
@@ -1362,6 +1495,8 @@ public class HdfsScanNode extends ScanNode {
    * Therefore, this upper bound is independent of the number of concurrent scans and
    * queries and helps to derive a tighter per-host memory estimate for queries with
    * multiple concurrent scans.
+   * TODO: this doesn't accurately describe how the backend works, but it is useful to
+   * have an upper bound. We should rethink and replace this with a different upper bound.
    */
   public static long getPerHostMemUpperBound() {
     // THREADS_PER_CORE each using a default of

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index 818c573..629c44b 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -104,6 +104,20 @@ public class TestUtils {
 
   static FileSizeFilter fileSizeFilter_ = new FileSizeFilter();
 
+  // Ignore the exact estimated row count, which depends on the file sizes.
+  static class ScanRangeRowCountFilter implements ResultFilter {
+    private final static String NUMBER_FILTER = "\\d+(\\.\\d+)?";
+    private final static String FILTER_KEY = " max-scan-range-rows=";
+
+    public boolean matches(String input) { return input.contains(FILTER_KEY); }
+
+    public String transform(String input) {
+      return input.replaceAll(FILTER_KEY + NUMBER_FILTER, FILTER_KEY);
+    }
+  }
+
+  static ScanRangeRowCountFilter scanRangeRowCountFilter_ = new ScanRangeRowCountFilter();
+
   /**
    * Do a line-by-line comparison of actual and expected output.
    * Comparison of the individual lines ignores whitespace.
@@ -142,6 +156,11 @@ public class TestUtils {
         expectedStr = fileSizeFilter_.transform(expectedStr);
         actualStr = fileSizeFilter_.transform(actualStr);
       }
+      if (scanRangeRowCountFilter_.matches(expectedStr)) {
+        containsPrefix = true;
+        expectedStr = scanRangeRowCountFilter_.transform(expectedStr);
+        actualStr = scanRangeRowCountFilter_.transform(actualStr);
+      }
 
       boolean ignoreAfter = false;
       for (int j = 0; j < ignoreContentAfter_.length; ++j) {

http://git-wip-us.apache.org/repos/asf/impala/blob/418c7057/testdata/bin/compute-table-stats.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/compute-table-stats.sh b/testdata/bin/compute-table-stats.sh
index 98434ee..63eb0da 100755
--- a/testdata/bin/compute-table-stats.sh
+++ b/testdata/bin/compute-table-stats.sh
@@ -39,7 +39,7 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
   ${COMPUTE_STATS_SCRIPT} --db_name=functional_hbase\
     --table_names="alltypessmall,stringids"
 fi
-${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet \
+${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet,tpch_orc_def \
     --table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier
 ${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet