You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/28 23:42:06 UTC

[15/15] impala git commit: IMPALA-4835: switch I/O buffers to buffer pool

IMPALA-4835: switch I/O buffers to buffer pool

This is the following squashed patches that were reverted.

I will fix the known issues with some follow-on patches.

======================================================================
IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation

In preparation for switching the I/O mgr to the buffer pool, this
removes and cleans up a lot of code so that the switchover patch starts
from a cleaner slate.

* Remove the free buffer cache (which will be replaced by buffer pool's
  own caching).
* Make memory limit exceeded error checking synchronous (in anticipation
  of having to propagate buffer pool errors synchronously).
* Simplify error propagation - remove the (ineffectual) code that
  enqueued BufferDescriptors containing error statuses.
* Document locking scheme better in a few places, make it part of the
  function signature when it seemed reasonable.
* Move ReturnBuffer() to ScanRange, because it is intrinsically
  connected with the lifecycle of a scan range.
* Separate external ReturnBuffer() and internal CleanUpBuffer()
  interfaces - previously callers of ReturnBuffer() were fudging
  the num_buffers_in_reader accounting to make the external interface work.
* Eliminate redundant state in ScanRange: 'eosr_returned_' and
  'is_cancelled_'.
* Clarify the logic around calling Close() for the last
  BufferDescriptor.
  -> There appeared to be an implicit assumption that buffers would be
     freed in the order they were returned from the scan range, so that
     the "eos" buffer was returned last. Instead just count the number
     of outstanding buffers to detect the last one.
  -> Touching the is_cancelled_ field without holding a lock was hard to
     reason about - violated locking rules and it was unclear that it
     was race-free.
* Remove DiskIoMgr::Read() to simplify the interface. It is trivial to
  inline at the callsites.

This will probably regress performance somewhat because of the cache
removal, so my plan is to merge it around the same time as switching
the I/O mgr to allocate from the buffer pool. I'm keeping the patches
separate to make reviewing easier.

Testing:
* Ran exhaustive tests
* Ran the disk-io-mgr-stress-test overnight

======================================================================
IMPALA-4835: Part 2: Allocate scan range buffers upfront

This change is a step towards reserving memory for buffers from the
buffer pool and constraining per-scanner memory requirements. This
change restructures the DiskIoMgr code so that each ScanRange operates
with a fixed set of buffers that are allocated upfront and recycled as
the I/O mgr works through the ScanRange.

One major change is that ScanRanges get blocked when a buffer is not
available and get unblocked when a client returns a buffer via
ReturnBuffer(). I was able to remove the logic to maintain the
blocked_ranges_ list by instead adding a separate set with all ranges
that are active.

There is also some miscellaneous cleanup included - e.g. reducing the
amount of code devoted to maintaining counters and metrics.

One tricky part of the existing code was the it called
IssueInitialRanges() with empty lists of files and depended on
DiskIoMgr::AddScanRanges() to not check for cancellation in that case.
See IMPALA-6564/IMPALA-6588. I changed the logic to not try to issue
ranges for empty lists of files.

I plan to merge this along with the actual buffer pool switch, but
separated it out to allow review of the DiskIoMgr changes separate from
other aspects of the buffer pool switchover.

Testing:
* Ran core and exhaustive tests.

======================================================================
IMPALA-4835: Part 3: switch I/O buffers to buffer pool

This is the final patch to switch the Disk I/O manager to allocate all
buffer from the buffer pool and to reserve the buffers required for
a query upfront.

* The planner reserves enough memory to run a single scanner per
  scan node.
* The multi-threaded scan node must increase reservation before
  spinning up more threads.
* The scanner implementations must be careful to stay within their
  assigned reservation.

The row-oriented scanners were most straightforward, since they only
have a single scan range active at a time. A single I/O buffer is
sufficient to scan the whole file but more I/O buffers can improve I/O
throughput.

Parquet is more complex because it issues a scan range per column and
the sizes of the columns on disk are not known during planning. To
deal with this, the reservation in the frontend is based on a
heuristic involving the file size and # columns. The Parquet scanner
can then divvy up reservation to columns based on the size of column
data on disk.

I adjusted how the 'mem_limit' is divided between buffer pool and non
buffer pool memory for low mem_limits to account for the increase in
buffer pool memory.

Testing:
* Added more planner tests to cover reservation calcs for scan node.
* Test scanners for all file formats with the reservation denial debug
  action, to test behaviour when the scanners hit reservation limits.
* Updated memory and buffer pool limits for tests.
* Added unit tests for dividing reservation between columns in parquet,
  since the algorithm is non-trivial.

Perf:
I ran TPC-H and targeted perf locally comparing with master. Both
showed small improvements of a few percent and no regressions of
note. Cluster perf tests showed no significant change.

Change-Id: I3ef471dc0746f0ab93b572c34024fc7343161f00
Reviewed-on: http://gerrit.cloudera.org:8080/9679
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fb5dc9eb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fb5dc9eb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fb5dc9eb

Branch: refs/heads/master
Commit: fb5dc9eb484e54cf9f37d06168392c5bc2a0f4fe
Parents: 789c5aa
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sun Oct 29 12:38:47 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Sat Apr 28 23:41:39 2018 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |    1 +
 be/src/exec/base-sequence-scanner.cc            |    1 +
 be/src/exec/base-sequence-scanner.h             |    3 +-
 be/src/exec/hdfs-lzo-text-scanner.cc            |    1 +
 be/src/exec/hdfs-orc-scanner.cc                 |   10 +-
 be/src/exec/hdfs-orc-scanner.h                  |    2 +-
 be/src/exec/hdfs-parquet-scanner-test.cc        |   96 +
 be/src/exec/hdfs-parquet-scanner.cc             |  226 ++-
 be/src/exec/hdfs-parquet-scanner.h              |   44 +-
 be/src/exec/hdfs-scan-node-base.cc              |   99 +-
 be/src/exec/hdfs-scan-node-base.h               |    3 +
 be/src/exec/hdfs-scan-node-mt.cc                |   20 +-
 be/src/exec/hdfs-scan-node.cc                   |  166 +-
 be/src/exec/hdfs-scan-node.h                    |   66 +-
 be/src/exec/hdfs-scanner.cc                     |    5 +-
 be/src/exec/hdfs-text-scanner.cc                |    6 +-
 be/src/exec/parquet-column-readers.cc           |  107 +-
 be/src/exec/parquet-column-readers.h            |   88 +-
 be/src/exec/scanner-context.cc                  |   42 +-
 be/src/exec/scanner-context.h                   |   54 +-
 be/src/runtime/bufferpool/buffer-pool.h         |    1 +
 .../bufferpool/reservation-tracker-test.cc      |    8 +-
 be/src/runtime/bufferpool/reservation-util.cc   |    2 +-
 be/src/runtime/exec-env.cc                      |    7 +-
 be/src/runtime/io/disk-io-mgr-internal.h        |   16 +
 be/src/runtime/io/disk-io-mgr-stress-test.cc    |   43 +-
 be/src/runtime/io/disk-io-mgr-stress.cc         |   89 +-
 be/src/runtime/io/disk-io-mgr-stress.h          |   26 +-
 be/src/runtime/io/disk-io-mgr-test.cc           |  850 ++++----
 be/src/runtime/io/disk-io-mgr.cc                |  718 ++-----
 be/src/runtime/io/disk-io-mgr.h                 |  383 ++--
 be/src/runtime/io/request-context.cc            |  239 ++-
 be/src/runtime/io/request-context.h             |  318 +--
 be/src/runtime/io/request-ranges.h              |  196 +-
 be/src/runtime/io/scan-range.cc                 |  309 +--
 be/src/runtime/mem-tracker.h                    |    1 -
 be/src/runtime/test-env.cc                      |    2 +-
 be/src/runtime/tmp-file-mgr-test.cc             |    3 +-
 be/src/runtime/tmp-file-mgr.cc                  |   18 +-
 be/src/runtime/tmp-file-mgr.h                   |   17 +-
 be/src/util/bit-util-test.cc                    |   11 +
 be/src/util/bit-util.h                          |    8 +-
 be/src/util/impalad-metrics.cc                  |   13 +-
 be/src/util/impalad-metrics.h                   |    9 -
 common/thrift/PlanNodes.thrift                  |    3 +
 .../apache/impala/analysis/SlotDescriptor.java  |   19 +
 .../org/apache/impala/analysis/SlotRef.java     |   20 -
 .../org/apache/impala/planner/HdfsScanNode.java |  169 +-
 .../java/org/apache/impala/util/BitUtil.java    |    6 +
 .../org/apache/impala/util/BitUtilTest.java     |    6 +
 .../queries/PlannerTest/constant-folding.test   |   42 +-
 .../queries/PlannerTest/disable-codegen.test    |   24 +-
 .../PlannerTest/fk-pk-join-detection.test       |   78 +-
 .../queries/PlannerTest/max-row-size.test       |   80 +-
 .../PlannerTest/min-max-runtime-filters.test    |    6 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   40 +-
 .../queries/PlannerTest/parquet-filtering.test  |   42 +-
 .../queries/PlannerTest/partition-pruning.test  |    4 +-
 .../PlannerTest/resource-requirements.test      | 1814 ++++++++++++++----
 .../PlannerTest/sort-expr-materialization.test  |   34 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  192 +-
 .../queries/PlannerTest/tablesample.test        |   44 +-
 .../queries/PlannerTest/union.test              |    8 +-
 .../admission-reject-min-reservation.test       |   12 +-
 .../queries/QueryTest/analytic-fns.test         |    5 +-
 .../queries/QueryTest/codegen-mem-limit.test    |    5 +-
 .../QueryTest/disk-spill-encryption.test        |    2 +-
 .../queries/QueryTest/explain-level0.test       |    4 +-
 .../queries/QueryTest/explain-level1.test       |    4 +-
 .../queries/QueryTest/explain-level2.test       |   14 +-
 .../queries/QueryTest/explain-level3.test       |   14 +-
 .../queries/QueryTest/nested-types-tpch.test    |    6 +-
 .../queries/QueryTest/runtime_row_filters.test  |   11 +-
 .../queries/QueryTest/scanners.test             |    7 +
 .../functional-query/queries/QueryTest/set.test |    2 +-
 .../queries/QueryTest/spilling-aggs.test        |   19 +-
 .../spilling-naaj-no-deny-reservation.test      |    7 +-
 .../queries/QueryTest/spilling-naaj.test        |    8 +-
 .../QueryTest/spilling-no-debug-action.test     |   66 +
 .../QueryTest/spilling-sorts-exhaustive.test    |   10 +-
 .../queries/QueryTest/spilling.test             |   76 +-
 .../queries/QueryTest/stats-extrapolation.test  |   40 +-
 .../tpch/queries/sort-reservation-usage.test    |    9 +-
 tests/common/test_dimensions.py                 |   16 +-
 tests/custom_cluster/test_scratch_disk.py       |    2 +-
 tests/query_test/test_mem_usage_scaling.py      |   11 +-
 tests/query_test/test_query_mem_limit.py        |    4 +-
 tests/query_test/test_scanners.py               |   25 +-
 tests/query_test/test_scanners_fuzz.py          |    9 +-
 tests/query_test/test_sort.py                   |   16 +-
 tests/query_test/test_spilling.py               |    5 +
 91 files changed, 4568 insertions(+), 2799 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index ccec3fb..0317cfe 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -111,3 +111,4 @@ ADD_BE_TEST(parquet-version-test)
 ADD_BE_TEST(row-batch-list-test)
 ADD_BE_TEST(incr-stats-util-test)
 ADD_BE_TEST(hdfs-avro-scanner-test)
+ADD_BE_TEST(hdfs-parquet-scanner-test)

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 9cb6330..9d95b0b 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -46,6 +46,7 @@ static const int MIN_SYNC_READ_SIZE = 64 * 1024; // bytes
 
 Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   // Issue just the header range for each file.  When the header is complete,
   // we'll issue the splits for that file.  Splits cannot be processed until the
   // header is parsed (the header object is then shared across splits for that file).

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h
index 887ff6f..3c2326e 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -47,7 +47,8 @@ class ScannerContext;
 /// situation, causing the block to be incorrectly skipped.
 class BaseSequenceScanner : public HdfsScanner {
  public:
-  /// Issue the initial ranges for all sequence container files.
+  /// Issue the initial ranges for all sequence container files. 'files' must not be
+  /// empty.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-lzo-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc b/be/src/exec/hdfs-lzo-text-scanner.cc
index 88ae295..8af89f2 100644
--- a/be/src/exec/hdfs-lzo-text-scanner.cc
+++ b/be/src/exec/hdfs-lzo-text-scanner.cc
@@ -62,6 +62,7 @@ HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
 
 Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   if (LzoIssueInitialRanges == NULL) {
     lock_guard<SpinLock> l(lzo_load_lock_);
     if (library_load_status_.ok()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-orc-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 3660600..f12230e 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -21,6 +21,7 @@
 
 #include "exec/scanner-context.inline.h"
 #include "exprs/expr.h"
+#include "runtime/exec-env.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/tuple-row.h"
 #include "util/decompress.h"
@@ -35,6 +36,7 @@ DEFINE_bool(enable_orc_scanner, true,
 
 Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   for (HdfsFileDesc* file : files) {
     // If the file size is less than 10 bytes, it is an invalid ORC file.
     if (file->file_length < 10) {
@@ -112,10 +114,12 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
   Status status;
   {
     SCOPED_TIMER(scanner_->state_->total_storage_wait_timer());
-    status = scanner_->state_->io_mgr()->Read(
-        scanner_->scan_node_->reader_context(), range, &io_buffer);
+    bool needs_buffers;
+    status = ExecEnv::GetInstance()->disk_io_mgr()->StartScanRange(
+          scanner_->scan_node_->reader_context(), range, &needs_buffers);
+    DCHECK(!status.ok() || !needs_buffers) << "Already provided a buffer";
   }
-  if (io_buffer != nullptr) scanner_->state_->io_mgr()->ReturnBuffer(move(io_buffer));
+  if (io_buffer != nullptr) range->ReturnBuffer(move(io_buffer));
   if (!status.ok()) throw ResourceError(status);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-orc-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 837d92a..5bdf4e7 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -80,7 +80,7 @@ class HdfsOrcScanner : public HdfsScanner {
     }
 
     uint64_t getNaturalReadSize() const {
-      return scanner_->state_->io_mgr()->max_read_buffer_size();
+      return scanner_->state_->io_mgr()->max_buffer_size();
     }
 
     void read(void* buf, uint64_t length, uint64_t offset);

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-parquet-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc b/be/src/exec/hdfs-parquet-scanner-test.cc
new file mode 100644
index 0000000..cbc6e76
--- /dev/null
+++ b/be/src/exec/hdfs-parquet-scanner-test.cc
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-parquet-scanner.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
+static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
+
+namespace impala {
+
+class HdfsParquetScannerTest : public testing::Test {
+ protected:
+  void TestDivideReservation(const vector<int64_t>& col_range_lengths,
+      int64_t total_col_reservation, const vector<int64_t>& expected_reservations);
+};
+
+/// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for
+/// inputs 'col_range_lengths' and 'total_col_reservation'.
+void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& col_range_lengths,
+      int64_t total_col_reservation, const vector<int64_t>& expected_reservations) {
+  vector<pair<int, int64_t>> reservations =
+      HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+      MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, total_col_reservation);
+  for (int i = 0; i < reservations.size(); ++i) {
+    LOG(INFO) << i << " " << reservations[i].first << " " << reservations[i].second;
+  }
+  EXPECT_EQ(reservations.size(), expected_reservations.size());
+  vector<bool> present(expected_reservations.size(), false);
+  for (auto& reservation: reservations) {
+    // Ensure that each appears exactly once.
+    EXPECT_FALSE(present[reservation.first]);
+    present[reservation.first] = true;
+    EXPECT_EQ(expected_reservations[reservation.first], reservation.second)
+        << reservation.first;
+  }
+}
+
+TEST_F(HdfsParquetScannerTest, DivideReservation) {
+  // Test a long scan ranges with lots of memory - should allocate 3 max-size
+  // buffers per range.
+  TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * MAX_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 1024,
+        {3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+
+  // Long scan ranges, not enough memory for 3 buffers each. Should only allocate
+  // max-sized buffers, preferring the longer scan range.
+  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * MAX_BUFFER_SIZE,
+        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024},
+        5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE,
+        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * MAX_BUFFER_SIZE - 1,
+        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+
+  // Test a short range with lots of memory - should round up buffer size.
+  TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024});
+
+  // Test a range << MIN_BUFFER_SIZE - should round up to buffer size.
+  TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE});
+
+  // Test long ranges with limited memory.
+  TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * MIN_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, {MAX_BUFFER_SIZE / 2});
+  TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE},
+      3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, MIN_BUFFER_SIZE});
+
+  // Test a mix of scan range lengths larger than and smaller than the max I/O buffer
+  // size. Long ranges get allocated most memory.
+  TestDivideReservation(
+      {15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 317304},
+      25165824,
+      {8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144});
+}
+
+}
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 73dd29b..8f9d8ca 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,6 +17,7 @@
 
 #include "exec/hdfs-parquet-scanner.h"
 
+#include <algorithm>
 #include <queue>
 
 #include <gutil/strings/substitute.h>
@@ -27,6 +28,7 @@
 #include "exec/parquet-column-stats.h"
 #include "exec/scanner-context.inline.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 #include "runtime/runtime-filter.inline.h"
@@ -35,13 +37,10 @@
 #include "common/names.h"
 
 using std::move;
+using std::sort;
 using namespace impala;
 using namespace impala::io;
 
-// Max dictionary page header size in bytes. This is an estimate and only needs to be an
-// upper bound.
-const int MAX_DICT_HEADER_SIZE = 100;
-
 // Max entries in the dictionary before switching to PLAIN encoding. If a dictionary
 // has fewer entries, then the entire column is dictionary encoded. This threshold
 // is guaranteed to be true for Impala versions 2.9 or below.
@@ -57,8 +56,11 @@ const char* HdfsParquetScanner::LLVM_CLASS_NAME = "class.impala::HdfsParquetScan
 const string PARQUET_MEM_LIMIT_EXCEEDED =
     "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
 
+namespace impala {
+
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   for (HdfsFileDesc* file : files) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
     if (file->file_length < 12) {
@@ -69,8 +71,6 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   return IssueFooterRanges(scan_node, THdfsFileFormat::PARQUET, files);
 }
 
-namespace impala {
-
 HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
   : HdfsScanner(scan_node, state),
     row_group_idx_(-1),
@@ -161,9 +161,14 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
+  int64_t stream_reservation = stream_->reservation();
+  stream_ = nullptr;
   context_->ReleaseCompletedResources(true);
   context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
+  // The scanner-wide stream was used only to read the file footer.  Each column has added
+  // its own stream. We can use the reservation from 'stream_' for the columns now.
+  total_col_reservation_ = stream_reservation;
 
   // Parse the file schema into an internal representation for schema resolution.
   schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
@@ -180,10 +185,6 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
 
   RETURN_IF_ERROR(InitDictFilterStructures());
-
-  // The scanner-wide stream was used only to read the file footer.  Each column has added
-  // its own stream.
-  stream_ = nullptr;
   return Status::OK();
 }
 
@@ -595,15 +596,13 @@ Status HdfsParquetScanner::NextRowGroup() {
     }
 
     InitCollectionColumns();
+    RETURN_IF_ERROR(InitScalarColumns());
 
-    // Prepare dictionary filtering columns for first read
-    // This must be done before dictionary filtering, because this code initializes
-    // the column offsets and streams needed to read the dictionaries.
-    // TODO: Restructure the code so that the dictionary can be read without the rest
-    // of the column.
-    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
+    // Start scanning dictionary filtering column readers, so we can read the dictionary
+    // pages in EvalDictionaryFilters().
+    RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
 
-    // InitColumns() may have allocated resources to scan columns. If we skip this row
+    // StartScans() may have allocated resources to scan columns. If we skip this row
     // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
 
     // If there is a dictionary-encoded column where every value is eliminated
@@ -624,10 +623,10 @@ Status HdfsParquetScanner::NextRowGroup() {
     }
 
     // At this point, the row group has passed any filtering criteria
-    // Prepare non-dictionary filtering column readers for first read and
-    // initialize their dictionaries.
-    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_));
-    status = InitDictionaries(non_dict_filterable_readers_);
+    // Start scanning non-dictionary filtering column readers and initialize their
+    // dictionaries.
+    RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
+    status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
@@ -637,7 +636,6 @@ Status HdfsParquetScanner::NextRowGroup() {
     DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
     break;
   }
-
   DCHECK(parse_status_.ok());
   return Status::OK();
 }
@@ -695,6 +693,7 @@ void HdfsParquetScanner::PartitionReaders(
     } else {
       BaseScalarColumnReader* scalar_reader =
           static_cast<BaseScalarColumnReader*>(reader);
+      scalar_readers_.push_back(scalar_reader);
       if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
         dict_filterable_readers_.push_back(scalar_reader);
       } else {
@@ -886,7 +885,7 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
 
   // Any columns that were not 100% dictionary encoded need to initialize
   // their dictionaries here.
-  RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list));
+  RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
 
   return Status::OK();
 }
@@ -1236,12 +1235,15 @@ Status HdfsParquetScanner::ProcessFooter() {
         BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
 
     unique_ptr<BufferDescriptor> io_buffer;
-    RETURN_IF_ERROR(
-        io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer));
+    bool needs_buffers;
+    RETURN_IF_ERROR(io_mgr->StartScanRange(
+          scan_node_->reader_context(), metadata_range, &needs_buffers));
+    DCHECK(!needs_buffers) << "Already provided a buffer";
+    RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
     DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
     DCHECK_EQ(io_buffer->len(), metadata_size);
     DCHECK(io_buffer->eosr());
-    io_mgr->ReturnBuffer(move(io_buffer));
+    metadata_range->ReturnBuffer(move(io_buffer));
   }
 
   // Deserialize file header
@@ -1441,23 +1443,16 @@ void HdfsParquetScanner::InitCollectionColumns() {
   }
 }
 
-Status HdfsParquetScanner::InitScalarColumns(
-    int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) {
+Status HdfsParquetScanner::InitScalarColumns() {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
   DCHECK(file_desc != nullptr);
-  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
 
-  // All the scan ranges (one for each column).
-  vector<ScanRange*> col_ranges;
   // Used to validate that the number of values in each reader in column_readers_ at the
   // same SchemaElement is the same.
   unordered_map<const parquet::SchemaElement*, int> num_values_map;
-  // Used to validate we issued the right number of scan ranges
-  int num_scalar_readers = 0;
-
-  for (BaseScalarColumnReader* scalar_reader: column_readers) {
-    ++num_scalar_readers;
+  for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
     const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()];
     auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
     int num_values = -1;
@@ -1466,78 +1461,115 @@ Status HdfsParquetScanner::InitScalarColumns(
     } else {
       num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values;
     }
-    int64_t col_start = col_chunk.meta_data.data_page_offset;
-
     if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
       // TODO: improve this error message by saying which columns are different,
       // and also specify column in other error messages as appropriate
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
     }
+    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
+  }
+  RETURN_IF_ERROR(
+      DivideReservationBetweenColumns(scalar_readers_, total_col_reservation_));
+  return Status::OK();
+}
 
-    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_,
-        filename(), row_group_idx, scalar_reader->col_idx(),
-        scalar_reader->schema_element(), state_));
+Status HdfsParquetScanner::DivideReservationBetweenColumns(
+    const vector<BaseScalarColumnReader*>& column_readers,
+    int64_t reservation_to_distribute) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  const int64_t min_buffer_size = io_mgr->min_buffer_size();
+  const int64_t max_buffer_size = io_mgr->max_buffer_size();
+  // The HdfsScanNode reservation calculation in the planner ensures that we have
+  // reservation for at least one buffer per column.
+  if (reservation_to_distribute < min_buffer_size * column_readers.size()) {
+    return Status(TErrorCode::INTERNAL_ERROR,
+        Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at "
+                   "least $1 bytes per column for $2 columns but had $3 bytes",
+            filename(), min_buffer_size, column_readers.size(),
+            reservation_to_distribute));
+  }
 
-    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
-      // Already validated in ValidateColumnOffsets()
-      DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
-      col_start = col_chunk.meta_data.dictionary_page_offset;
-    }
-    int64_t col_len = col_chunk.meta_data.total_compressed_size;
-    if (col_len <= 0) {
-      return Status(Substitute("File '$0' contains invalid column chunk size: $1",
-          filename(), col_len));
-    }
-    int64_t col_end = col_start + col_len;
-
-    // Already validated in ValidateColumnOffsets()
-    DCHECK_GT(col_end, 0);
-    DCHECK_LT(col_end, file_desc->file_length);
-    if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) {
-      // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
-      // dictionary page header size in total_compressed_size and total_uncompressed_size
-      // (see IMPALA-694). We pad col_len to compensate.
-      int64_t bytes_remaining = file_desc->file_length - col_end;
-      int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
-      col_len += pad;
-    }
+  vector<int64_t> col_range_lengths(column_readers.size());
+  for (int i = 0; i < column_readers.size(); ++i) {
+    col_range_lengths[i] = column_readers[i]->scan_range()->len();
+  }
+  vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
+      min_buffer_size, max_buffer_size, col_range_lengths, reservation_to_distribute);
+  for (auto& tmp_reservation : tmp_reservations) {
+    column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
+  }
+  return Status::OK();
+}
 
-    // TODO: this will need to change when we have co-located files and the columns
-    // are different files.
-    if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
-      return Status(Substitute("Expected parquet column file path '$0' to match "
-          "filename '$1'", col_chunk.file_path, filename()));
+vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+    int64_t min_buffer_size, int64_t max_buffer_size,
+    const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) {
+  // Pair of (column index, reservation allocated).
+  vector<pair<int, int64_t>> tmp_reservations;
+  for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0);
+
+  // Sort in descending order of length, breaking ties by index so that larger columns
+  // get allocated reservation first. It is common to have dramatically different column
+  // sizes in a single file because of different value sizes and compressibility. E.g.
+  // consider a large STRING "comment" field versus a highly compressible
+  // dictionary-encoded column with only a few distinct values. We want to give max-sized
+  // buffers to large columns first to maximize the size of I/Os that we do while reading
+  // this row group.
+  sort(tmp_reservations.begin(), tmp_reservations.end(),
+      [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
+        int64_t left_len = col_range_lengths[left.first];
+        int64_t right_len = col_range_lengths[right.first];
+        return left_len != right_len ? left_len > right_len : left.first < right.first;
+      });
+
+  // Set aside the minimum reservation per column.
+  reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
+
+  // Allocate reservations to columns by repeatedly allocating either a max-sized buffer
+  // or a large enough buffer to fit the remaining data for each column. Do this
+  // round-robin up to the ideal number of I/O buffers.
+  for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
+    for (auto& tmp_reservation : tmp_reservations) {
+      // Add back the reservation we set aside above.
+      if (i == 0) reservation_to_distribute += min_buffer_size;
+
+      int64_t bytes_left_in_range =
+          col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
+      int64_t bytes_to_add;
+      if (bytes_left_in_range >= max_buffer_size) {
+        if (reservation_to_distribute >= max_buffer_size) {
+          bytes_to_add = max_buffer_size;
+        } else if (i == 0) {
+          DCHECK_EQ(0, tmp_reservation.second);
+          // Ensure this range gets at least one buffer on the first iteration.
+          bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
+        } else {
+          DCHECK_GT(tmp_reservation.second, 0);
+          // We need to read more than the max buffer size, but can't allocate a
+          // max-sized buffer. Stop adding buffers to this column: we prefer to use
+          // the existing max-sized buffers without small buffers mixed in so that
+          // we will alway do max-sized I/Os, which make efficient use of I/O devices.
+          bytes_to_add = 0;
+        }
+      } else if (bytes_left_in_range > 0 &&
+          reservation_to_distribute >= min_buffer_size) {
+        // Choose a buffer size that will fit the rest of the bytes left in the range.
+        bytes_to_add = max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
+        // But don't add more reservation than is available.
+        bytes_to_add =
+            min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
+      } else {
+        bytes_to_add = 0;
+      }
+      DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add;
+      reservation_to_distribute -= bytes_to_add;
+      tmp_reservation.second += bytes_to_add;
+      DCHECK_GE(reservation_to_distribute, 0);
+      DCHECK_GT(tmp_reservation.second, 0);
     }
-
-    const ScanRange* split_range =
-        static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
-
-    // Determine if the column is completely contained within a local split.
-    bool col_range_local = split_range->expected_local()
-        && col_start >= split_range->offset()
-        && col_end <= split_range->offset() + split_range->len();
-    ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
-        filename(), col_len, col_start, partition_id, split_range->disk_id(),
-        col_range_local,
-        BufferOpts(split_range->try_cache(), file_desc->mtime));
-    col_ranges.push_back(col_range);
-
-    // Get the stream that will be used for this column
-    ScannerContext::Stream* stream = context_->AddStream(col_range);
-    DCHECK(stream != nullptr);
-
-    RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
   }
-  DCHECK_EQ(col_ranges.size(), num_scalar_readers);
-
-  // Issue all the column chunks to the io mgr and have them scheduled immediately.
-  // This means these ranges aren't returned via DiskIoMgr::GetNextRange and
-  // instead are scheduled to be read immediately.
-  RETURN_IF_ERROR(scan_node_->runtime_state()->io_mgr()->AddScanRanges(
-      scan_node_->reader_context(), col_ranges, true));
-
-  return Status::OK();
+  return tmp_reservations;
 }
 
 Status HdfsParquetScanner::InitDictionaries(

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 7fede3b..69749f8 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -33,7 +33,7 @@ class CollectionValueBuilder;
 struct HdfsFileDesc;
 
 /// Internal schema representation and resolution.
-class SchemaNode;
+struct SchemaNode;
 
 /// Class that implements Parquet definition and repetition level decoding.
 class ParquetLevelDecoder;
@@ -68,8 +68,8 @@ class BoolColumnReader;
 /// the split size, the mid point guarantees that we have at least 50% of the row group in
 /// the current split. ProcessSplit() then computes the column ranges for these row groups
 /// and submits them to the IoMgr for immediate scheduling (so they don't surface in
-/// DiskIoMgr::GetNextRange()). Scheduling them immediately also guarantees they are all
-/// read at once.
+/// DiskIoMgr::GetNextUnstartedRange()). Scheduling them immediately also guarantees they
+/// are all read at once.
 ///
 /// Like the other scanners, each parquet scanner object is one to one with a
 /// ScannerContext. Unlike the other scanners though, the context will have multiple
@@ -327,7 +327,7 @@ class HdfsParquetScanner : public HdfsScanner {
   virtual ~HdfsParquetScanner() {}
 
   /// Issue just the footer range for each file.  We'll then parse the footer and pick
-  /// out the columns we want.
+  /// out the columns we want. 'files' must not be empty.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;
@@ -360,6 +360,7 @@ class HdfsParquetScanner : public HdfsScanner {
   template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
   friend class ScalarColumnReader;
   friend class BoolColumnReader;
+  friend class HdfsParquetScannerTest;
 
   /// Index of the current row group being processed. Initialized to -1 which indicates
   /// that we have not started processing the first row group yet (GetNext() has not yet
@@ -390,7 +391,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of scratch batches processed so far.
   int64_t row_batches_produced_;
 
-  /// Column reader for each materialized columns for this file.
+  /// Column reader for each top-level materialized slot in the output tuple.
   std::vector<ParquetColumnReader*> column_readers_;
 
   /// Column readers will write slot values into this scratch batch for
@@ -406,6 +407,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_;
 
+  /// Reservation available for scanning columns, in bytes.
+  int64_t total_col_reservation_ = 0;
+
   /// Pool to copy dictionary page buffer into. This pool is shared across all the
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -422,6 +426,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// or nested within a collection.
   std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
 
+  /// Flattened list of all scalar column readers in column_readers_.
+  std::vector<BaseScalarColumnReader*> scalar_readers_;
+
   /// Flattened collection column readers that point to readers in column_readers_.
   std::vector<CollectionColumnReader*> collection_readers_;
 
@@ -562,12 +569,24 @@ class HdfsParquetScanner : public HdfsScanner {
       WARN_UNUSED_RESULT;
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  This
-  /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
-  /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of
-  /// top-level scalar columns and readers of scalar columns within a collection node).
-  Status InitScalarColumns(
-      int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers)
-      WARN_UNUSED_RESULT;
+  /// initializes 'scalar_readers_' and divides reservation between the columns but
+  /// does not start any scan ranges.
+  Status InitScalarColumns() WARN_UNUSED_RESULT;
+
+  /// Decides how to divide 'reservation_to_distribute' bytes of reservation between the
+  /// columns. Sets the reservation on each corresponding reader in 'column_readers'.
+  Status DivideReservationBetweenColumns(
+      const std::vector<BaseScalarColumnReader*>& column_readers,
+      int64_t reservation_to_distribute);
+
+  /// Helper for DivideReservationBetweenColumns. Implements the core algorithm for
+  /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
+  /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
+  /// a vector with an entry per column with the index into 'col_range_lengths' and the
+  /// amount of reservation in bytes to give to that column.
+  static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper(
+      int64_t min_buffer_size, int64_t max_buffer_size,
+      const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute);
 
   /// Initializes the column readers in collection_readers_.
   void InitCollectionColumns();
@@ -603,7 +622,8 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Partitions the readers into scalar and collection readers. The collection readers
   /// are flattened into collection_readers_. The scalar readers are partitioned into
   /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
-  /// dictionary filtering is enabled and the reader can be dictionary filtered.
+  /// dictionary filtering is enabled and the reader can be dictionary filtered. All
+  /// scalar readers are also flattened into scalar_readers_.
   void PartitionReaders(const vector<ParquetColumnReader*>& readers,
                         bool can_eval_dict_filters);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 4467da3..dc88a87 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -36,6 +36,7 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/request-context.h"
@@ -65,6 +66,7 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
+      ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation),
       min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
           tnode.hdfs_scan_node.min_max_tuple_id : -1),
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
@@ -81,6 +83,7 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
           &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
       disks_accessed_bitmap_(TUnit::UNIT, 0),
       active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
+  DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation);
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -110,7 +113,6 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
-
   return Status::OK();
 }
 
@@ -241,6 +243,16 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
   ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
 
+  // Check if reservation was enough to allocate at least one buffer. The
+  // reservation calculation in HdfsScanNode.java should guarantee this.
+  // Hitting this error indicates a misconfiguration or bug.
+  int64_t min_buffer_size = ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size();
+  if (scan_range_params_->size() > 0
+      && resource_profile_.min_reservation < min_buffer_size) {
+    return Status(TErrorCode::INTERNAL_ERROR,
+      Substitute("HDFS scan min reservation $0 must be >= min buffer size $1",
+       resource_profile_.min_reservation, min_buffer_size));
+  }
   // Add per volume stats to the runtime profile
   PerVolumeStats per_volume_stats;
   stringstream str;
@@ -327,7 +339,18 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
         partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state);
   }
 
-  reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker());
+  RETURN_IF_ERROR(ClaimBufferReservation(state));
+  // We got the minimum reservation. Now try to get ideal reservation.
+  if (resource_profile_.min_reservation != ideal_scan_range_reservation_) {
+    bool increased = buffer_pool_client_.IncreaseReservation(
+        ideal_scan_range_reservation_ - resource_profile_.min_reservation);
+    VLOG_FILE << "Increasing reservation from minimum "
+              << resource_profile_.min_reservation << "B to ideal "
+              << ideal_scan_range_reservation_ << "B "
+              << (increased ? "succeeded" : "failed");
+  }
+
+  reader_context_ = runtime_state_->io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
   // TODO: Revisit counters and move the counters specific to multi-threaded scans
@@ -348,14 +371,11 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  runtime_state_->io_mgr()->set_bytes_read_counter(
-      reader_context_.get(), bytes_read_counter());
-  runtime_state_->io_mgr()->set_read_timer(reader_context_.get(), read_timer());
-  runtime_state_->io_mgr()->set_open_file_timer(reader_context_.get(), open_file_timer());
-  runtime_state_->io_mgr()->set_active_read_thread_counter(
-      reader_context_.get(), &active_hdfs_read_thread_counter_);
-  runtime_state_->io_mgr()->set_disks_access_bitmap(
-      reader_context_.get(), &disks_accessed_bitmap_);
+  reader_context_->set_bytes_read_counter(bytes_read_counter());
+  reader_context_->set_read_timer(read_timer());
+  reader_context_->set_open_file_timer(open_file_timer());
+  reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
+  reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
 
   average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
       AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
@@ -449,20 +469,29 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
     }
   }
 
-  // Issue initial ranges for all file types.
-  RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::PARQUET]));
-  RETURN_IF_ERROR(HdfsOrcScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::ORC]));
-  RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::TEXT]));
-  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
-  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::RC_FILE]));
-  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::AVRO]));
-
+  // Issue initial ranges for all file types. Only call functions for file types that
+  // actually exist - trying to add empty lists of ranges can result in spurious
+  // CANCELLED errors - see IMPALA-6564.
+  for (const auto& entry : matching_per_type_files) {
+    if (entry.second.empty()) continue;
+    switch (entry.first) {
+      case THdfsFileFormat::PARQUET:
+        RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, entry.second));
+        break;
+      case THdfsFileFormat::TEXT:
+        RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, entry.second));
+        break;
+      case THdfsFileFormat::SEQUENCE_FILE:
+      case THdfsFileFormat::RC_FILE:
+      case THdfsFileFormat::AVRO:
+        RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, entry.second));
+        break;
+      case THdfsFileFormat::ORC:
+        RETURN_IF_ERROR(HdfsOrcScanner::IssueInitialRanges(this, entry.second));
+      default:
+        DCHECK(false) << "Unexpected file type " << entry.first;
+    }
+  }
   return Status::OK();
 }
 
@@ -521,6 +550,8 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
 
 Status HdfsScanNodeBase::AddDiskIoRanges(
     const vector<ScanRange*>& ranges, int num_files_queued) {
+  DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
+  DCHECK_GT(ranges.size(), 0);
   RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
@@ -829,20 +860,14 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
       Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
 
   if (reader_context_ != nullptr) {
-    bytes_read_local_->Set(
-        runtime_state_->io_mgr()->bytes_read_local(reader_context_.get()));
-    bytes_read_short_circuit_->Set(
-        runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_.get()));
-    bytes_read_dn_cache_->Set(
-        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_.get()));
-    num_remote_ranges_->Set(static_cast<int64_t>(
-        runtime_state_->io_mgr()->num_remote_ranges(reader_context_.get())));
-    unexpected_remote_bytes_->Set(
-        runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_.get()));
-    cached_file_handles_hit_count_->Set(
-        runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_.get()));
+    bytes_read_local_->Set(reader_context_->bytes_read_local());
+    bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
+    bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
+    num_remote_ranges_->Set(reader_context_->num_remote_ranges());
+    unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
+    cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
     cached_file_handles_miss_count_->Set(
-        runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_.get()));
+        reader_context_->cached_file_handles_miss_count());
 
     if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
       runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 70fbac2..3a9c37f 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -322,6 +322,9 @@ class HdfsScanNodeBase : public ScanNode {
   friend class ScannerContext;
   friend class HdfsScanner;
 
+  /// Ideal reservation to process each input split, computed by the planner.
+  const int64_t ideal_scan_range_reservation_;
+
   /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
   const int min_max_tuple_id_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index d143e91..2786742 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -26,6 +26,7 @@
 
 #include "gen-cpp/PlanNodes_types.h"
 
+using namespace impala::io;
 using std::stringstream;
 
 namespace impala {
@@ -77,20 +78,27 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
       scanner_->Close(row_batch);
       scanner_.reset();
     }
-    RETURN_IF_ERROR(
-        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range_));
-    if (scan_range_ == NULL) {
+    DiskIoMgr* io_mgr = runtime_state_->io_mgr();
+    bool needs_buffers;
+    RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
+        reader_context_.get(), &scan_range_, &needs_buffers));
+    if (scan_range_ == nullptr) {
       *eos = true;
       StopAndFinalizeCounters();
       return Status::OK();
     }
+    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
+    if (needs_buffers) {
+      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(),
+          &buffer_pool_client_, scan_range_, scanner_reservation));
+    }
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
-    scanner_ctx_.reset(new ScannerContext(
-        runtime_state_, this, partition, scan_range_, filter_ctxs(),
-        expr_results_pool()));
+    scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_,
+        partition, filter_ctxs(), expr_results_pool()));
+    scanner_ctx_->AddStream(scan_range_, scanner_reservation);
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
     if (!status.ok()) {
       DCHECK(scanner_ == NULL);

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 045ae18..e8dac08 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -26,6 +26,7 @@
 #include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/io/request-context.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
@@ -46,16 +47,6 @@ DECLARE_bool(skip_file_runtime_filtering);
 using namespace impala;
 using namespace impala::io;
 
-// Amount of memory that we approximate a scanner thread will use not including IoBuffers.
-// The memory used does not vary considerably between file formats (just a couple of MBs).
-// This value is conservative and taken from running against the tpch lineitem table.
-// TODO: revisit how we do this.
-const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
-
-// Estimated upper bound on the compression ratio of compressed text files. Used to
-// estimate scanner thread memory usage.
-const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
-
 // Amount of time to block waiting for GetNext() to release scanner threads between
 // checking if a scanner thread should yield itself back to the global thread pool.
 const int SCANNER_THREAD_WAIT_TIME_MS = 20;
@@ -63,11 +54,6 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20;
 HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
                            const DescriptorTbl& descs)
     : HdfsScanNodeBase(pool, tnode, descs),
-      ranges_issued_barrier_(1),
-      scanner_thread_bytes_required_(0),
-      done_(false),
-      all_ranges_started_(false),
-      thread_avail_cb_id_(-1),
       max_num_scanner_threads_(CpuInfo::num_cores()) {
 }
 
@@ -168,36 +154,6 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
-
-  // Compute the minimum bytes required to start a new thread. This is based on the
-  // file format.
-  // The higher the estimate, the less likely it is the query will fail but more likely
-  // the query will be throttled when it does not need to be.
-  // TODO: how many buffers should we estimate per range. The IoMgr will throttle down to
-  // one but if there are already buffers queued before memory pressure was hit, we can't
-  // reclaim that memory.
-  if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
-    // Parquet files require buffers per column
-    scanner_thread_bytes_required_ =
-        materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_read_buffer_size();
-  } else {
-    scanner_thread_bytes_required_ =
-        3 * runtime_state_->io_mgr()->max_read_buffer_size();
-  }
-  // scanner_thread_bytes_required_ now contains the IoBuffer requirement.
-  // Next we add in the other memory the scanner thread will use.
-  // e.g. decompression buffers, tuple buffers, etc.
-  // For compressed text, we estimate this based on the file size (since the whole file
-  // will need to be decompressed at once). For all other formats, we use a constant.
-  // TODO: can we do something better?
-  int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE;
-  for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) {
-    if (file->file_compression != THdfsCompression::NONE) {
-      int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO;
-      scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage);
-    }
-  }
-  scanner_thread_bytes_required_ += scanner_thread_mem_usage;
   row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
   row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
   return Status::OK();
@@ -216,10 +172,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
     max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
   }
   DCHECK_GT(max_num_scanner_threads_, 0);
-
+  spare_reservation_.Store(buffer_pool_client_.GetReservation());
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
-
   return Status::OK();
 }
 
@@ -260,37 +215,28 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
   return Status::OK();
 }
 
-// For controlling the amount of memory used for scanners, we approximate the
-// scanner mem usage based on scanner_thread_bytes_required_, rather than the
-// consumption in the scan node's mem tracker. The problem with the scan node
-// trackers is that it does not account for the memory the scanner will use.
-// For example, if there is 110 MB of memory left (based on the mem tracker)
-// and we estimate that a scanner will use 100MB, we want to make sure to only
-// start up one additional thread. However, after starting the first thread, the
-// mem tracker value will not change immediately (it takes some time before the
-// scanner is running and starts using memory). Therefore we just use the estimate
-// based on the number of running scanner threads.
-bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) {
-  int64_t committed_scanner_mem =
-      active_scanner_thread_counter_.value() * scanner_thread_bytes_required_;
-  int64_t tracker_consumption = mem_tracker()->consumption();
-  int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption;
-  if (est_additional_scanner_mem < 0) {
-    // This is the case where our estimate was too low. Expand the estimate based
-    // on the usage.
-    int64_t avg_consumption =
-        tracker_consumption / active_scanner_thread_counter_.value();
-    // Take the average and expand it by 50%. Some scanners will not have hit their
-    // steady state mem usage yet.
-    // TODO: how can we scale down if we've overestimated.
-    // TODO: better heuristic?
-    scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5);
-    est_additional_scanner_mem = 0;
-  }
+bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& lock) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true;
+  int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load();
+  if (!buffer_pool_client_.IncreaseReservation(increase)) return false;
+  spare_reservation_.Add(increase);
+  return true;
+}
 
-  // If we are starting a new thread, take that into account now.
-  if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_;
-  return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
+int64_t HdfsScanNode::DeductReservationForScannerThread(const unique_lock<mutex>& lock,
+    bool first_thread) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  int64_t amount;
+  if (first_thread) {
+    amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ?
+        ideal_scan_range_reservation_ : resource_profile_.min_reservation;
+  } else {
+    amount = ideal_scan_range_reservation_;
+  }
+  int64_t remainder = spare_reservation_.Add(-amount);
+  DCHECK_GE(remainder, 0);
+  return amount;
 }
 
 void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
@@ -320,17 +266,19 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
     // TODO: This still leans heavily on starvation-free locks, come up with a more
     // correct way to communicate between this method and ScannerThreadHelper
     unique_lock<mutex> lock(lock_);
+
+    const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value();
+    const bool first_thread = num_active_scanner_threads == 0;
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
-        active_scanner_thread_counter_.value() >= progress_.remaining()) {
+        num_active_scanner_threads >= progress_.remaining()) {
       break;
     }
 
-    bool first_thread = active_scanner_thread_counter_.value() == 0;
     // Cases 5 and 6.
     if (!first_thread &&
         (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
-         !EnoughMemoryForScannerThread(true))) {
+         !EnoughReservationForExtraThread(lock))) {
       break;
     }
 
@@ -343,16 +291,23 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
       break;
     }
 
+    // Deduct the reservation. We haven't dropped the lock since the
+    // first_thread/EnoughReservationForExtraThread() checks so spare reservation
+    // must be available.
+    int64_t scanner_thread_reservation =
+        DeductReservationForScannerThread(lock, first_thread);
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
-
-    auto fn = [this, first_thread]() { this->ScannerThread(first_thread); };
+    auto fn = [this, first_thread, scanner_thread_reservation]() {
+      this->ScannerThread(first_thread, scanner_thread_reservation);
+    };
     std::unique_ptr<Thread> t;
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
     if (!status.ok()) {
+      ReturnReservationFromScannerThread(scanner_thread_reservation);
       COUNTER_ADD(&active_scanner_thread_counter_, -1);
       // Release the token and skip running callbacks to find a replacement. Skipping
       // serves two purposes. First, it prevents a mutual recursion between this function
@@ -373,9 +328,10 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
   }
 }
 
-void HdfsScanNode::ScannerThread(bool first_thread) {
+void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reservation) {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
+  DiskIoMgr* io_mgr = runtime_state_->io_mgr();
 
   // Make thread-local copy of filter contexts to prune scan ranges, and to pass to the
   // scanner for finer-grained filtering. Use a thread-local MemPool for the filter
@@ -401,8 +357,7 @@ void HdfsScanNode::ScannerThread(bool first_thread) {
       // this thread.
       unique_lock<mutex> l(lock_);
       if (active_scanner_thread_counter_.value() > 1) {
-        if (runtime_state_->resource_pool()->optional_exceeded() ||
-            !EnoughMemoryForScannerThread(false)) {
+        if (runtime_state_->resource_pool()->optional_exceeded()) {
           // We can't break here. We need to update the counter with the lock held or else
           // all threads might see active_scanner_thread_counter_.value > 1
           COUNTER_ADD(&active_scanner_thread_counter_, -1);
@@ -422,21 +377,29 @@ void HdfsScanNode::ScannerThread(bool first_thread) {
     // to return if there's an error.
     ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
 
-    ScanRange* scan_range;
-    // Take a snapshot of num_unqueued_files_ before calling GetNextRange().
+    // Take a snapshot of num_unqueued_files_ before calling GetNextUnstartedRange().
     // We don't want num_unqueued_files_ to go to zero between the return from
-    // GetNextRange() and the check for when all ranges are complete.
+    // GetNextUnstartedRange() and the check for when all ranges are complete.
     int num_unqueued_files = num_unqueued_files_.Load();
     // TODO: the Load() acts as an acquire barrier.  Is this needed? (i.e. any earlier
     // stores that need to complete?)
     AtomicUtil::MemoryBarrier();
+    ScanRange* scan_range;
+    bool needs_buffers;
     Status status =
-        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range);
+        io_mgr->GetNextUnstartedRange(reader_context_.get(), &scan_range, &needs_buffers);
 
-    if (status.ok() && scan_range != NULL) {
-      // Got a scan range. Process the range end to end (in this thread).
-      status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
-          &expr_results_pool, scan_range);
+    if (status.ok() && scan_range != nullptr) {
+      if (needs_buffers) {
+        status = io_mgr->AllocateBuffersForRange(
+            reader_context_.get(), &buffer_pool_client_, scan_range,
+            scanner_thread_reservation);
+      }
+      if (status.ok()) {
+        // Got a scan range. Process the range end to end (in this thread).
+        status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
+            &expr_results_pool, scan_range, scanner_thread_reservation);
+      }
     }
 
     if (!status.ok()) {
@@ -466,8 +429,8 @@ void HdfsScanNode::ScannerThread(bool first_thread) {
       // TODO: Based on the usage pattern of all_ranges_started_, it looks like it is not
       // needed to acquire the lock in x86.
       unique_lock<mutex> l(lock_);
-      // All ranges have been queued and GetNextRange() returned NULL. This means that
-      // every range is either done or being processed by another thread.
+      // All ranges have been queued and GetNextUnstartedRange() returned NULL. This means
+      // that every range is either done or being processed by another thread.
       all_ranges_started_ = true;
       break;
     }
@@ -475,6 +438,7 @@ void HdfsScanNode::ScannerThread(bool first_thread) {
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
 exit:
+  ReturnReservationFromScannerThread(scanner_thread_reservation);
   runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
@@ -482,7 +446,8 @@ exit:
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
-    MemPool* expr_results_pool, ScanRange* scan_range) {
+    MemPool* expr_results_pool, ScanRange* scan_range,
+    int64_t scanner_thread_reservation) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -507,8 +472,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     return Status::OK();
   }
 
-  ScannerContext context(
-      runtime_state_, this, partition, scan_range, filter_ctxs, expr_results_pool);
+  ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition,
+      filter_ctxs, expr_results_pool);
+  context.AddStream(scan_range, scanner_thread_reservation);
   scoped_ptr<HdfsScanner> scanner;
   Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {
@@ -550,9 +516,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
 void HdfsScanNode::SetDoneInternal() {
   if (done_) return;
   done_ = true;
-  if (reader_context_ != nullptr) {
-    runtime_state_->io_mgr()->CancelContext(reader_context_.get());
-  }
+  if (reader_context_ != nullptr) reader_context_->Cancel();
   materialized_row_batches_->Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index a9be94e..2dfbb10 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -27,6 +27,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
+#include "common/atomic.h"
 #include "exec/filter-context.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -59,8 +60,14 @@ class TPlanNode;
 /// 5. The scanner finishes the scan range and informs the scan node so it can track
 ///    end of stream.
 ///
-/// TODO: This class allocates a bunch of small utility objects that should be
-/// recycled.
+/// Buffer management:
+/// ------------------
+/// The different scanner threads all allocate I/O buffers from the node's Buffer Pool
+/// client. The scan node ensures that enough reservation is available to start a
+/// scanner thread before launching each one with (see
+/// EnoughReservationForExtraThread()), after which the scanner thread is responsible
+/// for staying within the reservation handed off to it.
+///
 /// TODO: Remove this class once the fragment-based multi-threaded execution is
 /// fully functional.
 class HdfsScanNode : public HdfsScanNodeBase {
@@ -101,12 +108,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
  private:
   /// Released when initial ranges are issued in the first call to GetNext().
-  CountingBarrier ranges_issued_barrier_;
-
-  /// The estimated memory required to start up a new scanner thread. If the memory
-  /// left (due to limits) is less than this value, we won't start up optional
-  /// scanner threads.
-  int64_t scanner_thread_bytes_required_;
+  CountingBarrier ranges_issued_barrier_{1};
 
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
@@ -131,16 +133,16 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// are finished, an error/cancellation occurred, or the limit was reached.
   /// Setting this to true triggers the scanner threads to clean up.
   /// This should not be explicitly set. Instead, call SetDone().
-  bool done_;
+  bool done_ = false;
 
   /// Set to true if all ranges have started. Some of the ranges may still be in flight
   /// being processed by scanner threads, but no new ScannerThreads should be started.
-  bool all_ranges_started_;
+  bool all_ranges_started_ = false;
 
   /// The id of the callback added to the thread resource manager when thread token
   /// is available. Used to remove the callback before this scan node is destroyed.
   /// -1 if no callback is registered.
-  int thread_avail_cb_id_;
+  int thread_avail_cb_id_ = -1;
 
   /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
   /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
@@ -148,6 +150,14 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// the number of cores.
   int max_num_scanner_threads_;
 
+  /// Amount of the 'buffer_pool_client_' reservation that is not allocated to scanner
+  /// threads. Doled out to scanner threads when they are started and returned when
+  /// those threads no longer need it. Can be atomically incremented without holding
+  /// 'lock_' but 'lock_' is held when decrementing to ensure that the check for
+  /// reservation and the actual deduction are atomic with respect to other threads
+  /// trying to claim reservation.
+  AtomicInt64 spare_reservation_{0};
+
   /// The wait time for fetching a row batch from the row batch queue.
   RuntimeProfile::Counter* row_batches_get_timer_;
 
@@ -163,21 +173,35 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// This thread terminates when all scan ranges are complete or an error occurred.
   /// 'first_thread' is true if this was the first scanner thread to start and
   /// it acquired a "required" thread token from ThreadResourceMgr.
-  void ScannerThread(bool first_thread);
+  /// The caller must have reserved 'scanner_thread_reservation' bytes of memory for
+  /// this thread with DeductReservationForScannerThread().
+  void ScannerThread(bool first_thread, int64_t scanner_thread_reservation);
 
   /// Process the entire scan range with a new scanner object. Executed in scanner
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
   /// in this split.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
-      MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT;
-
-  /// Returns true if there is enough memory (against the mem tracker limits) to
-  /// have a scanner thread.
-  /// If new_thread is true, the calculation is for starting a new scanner thread.
-  /// If false, it determines whether there's adequate memory for the existing
-  /// set of scanner threads.
-  /// lock_ must be taken before calling this.
-  bool EnoughMemoryForScannerThread(bool new_thread);
+      MemPool* expr_results_pool, io::ScanRange* scan_range,
+      int64_t scanner_thread_reservation) WARN_UNUSED_RESULT;
+
+  /// Return true if there is enough reservation to start an extra scanner thread.
+  /// Tries to increase reservation if enough is not already available in
+  /// 'spare_reservation_'. 'lock_' must be held via 'lock'
+  bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& lock);
+
+  /// Deduct reservation to start a new scanner thread from 'spare_reservation_'. If
+  /// 'first_thread' is true, this is the first thread to be started and only the
+  /// minimum reservation is required to be available. Otherwise
+  /// EnoughReservationForExtra() thread must have returned true in the current
+  /// critical section so that 'ideal_scan_range_bytes_' is available for the extra
+  /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'.
+  int64_t DeductReservationForScannerThread(const boost::unique_lock<boost::mutex>& lock,
+      bool first_thread);
+
+  /// Called by scanner thread to return or all of its reservation that is not needed.
+  void ReturnReservationFromScannerThread(int64_t bytes) {
+    spare_reservation_.Add(bytes);
+  }
 
   /// Checks for eos conditions and returns batches from materialized_row_batches_.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index d191d3f..2fd488f 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -779,6 +779,7 @@ void HdfsScanner::CheckFiltersEffectiveness() {
 
 Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
     const THdfsFileFormat::type& file_type, const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   vector<ScanRange*> footer_ranges;
   for (int i = 0; i < files.size(); ++i) {
     // Compute the offset of the file footer.
@@ -828,7 +829,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
   }
   // The threads that process the footer will also do the scan, so we mark all the files
   // as complete here.
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  if (footer_ranges.size() > 0) {
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index b78115d..1b08c97 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -144,8 +144,10 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         DCHECK(false);
     }
   }
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
-          compressed_text_files));
+  if (compressed_text_scan_ranges.size() > 0) {
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
+        compressed_text_files));
+  }
   if (lzo_text_files.size() > 0) {
     // This will dlopen the lzo binary and can fail if the lzo binary is not present.
     RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 685b030..f5b73c9 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -30,6 +30,7 @@
 #include "exec/scanner-context.inline.h"
 #include "rpc/thrift-util.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/runtime-state.h"
@@ -47,6 +48,10 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
     "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
     "be converted from UTC to local time. Writes are unaffected.");
 
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+static const int MAX_DICT_HEADER_SIZE = 100;
+
 // Max data page header size in bytes. This is an estimate and only needs to be an upper
 // bound. It is theoretically possible to have a page header of any size due to string
 // value statistics, but in practice we'll have trouble reading string values this large.
@@ -66,6 +71,8 @@ static int debug_count = 0;
 #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
 #endif
 
+using namespace impala::io;
+
 namespace impala {
 
 const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
@@ -835,8 +842,98 @@ static bool RequiresSkippedDictionaryHeaderCheck(
   return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
 }
 
+Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
+    const parquet::ColumnChunk& col_chunk, int row_group_idx) {
+  // Ensure metadata is valid before using it to initialize the reader.
+  RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
+      parent_->filename(), row_group_idx, col_idx(), schema_element(),
+      parent_->state_));
+  num_buffered_values_ = 0;
+  data_ = nullptr;
+  data_end_ = nullptr;
+  stream_ = nullptr;
+  io_reservation_ = 0;
+  metadata_ = &col_chunk.meta_data;
+  num_values_read_ = 0;
+  def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+  // See ColumnReader constructor.
+  rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
+  pos_current_value_ = HdfsParquetScanner::INVALID_POS;
+
+  if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+    RETURN_IF_ERROR(Codec::CreateDecompressor(
+        nullptr, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_));
+  }
+  int64_t col_start = col_chunk.meta_data.data_page_offset;
+  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+    // Already validated in ValidateColumnOffsets()
+    DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+    col_start = col_chunk.meta_data.dictionary_page_offset;
+  }
+  int64_t col_len = col_chunk.meta_data.total_compressed_size;
+  if (col_len <= 0) {
+    return Status(Substitute("File '$0' contains invalid column chunk size: $1",
+        filename(), col_len));
+  }
+  int64_t col_end = col_start + col_len;
+
+  // Already validated in ValidateColumnOffsets()
+  DCHECK_GT(col_end, 0);
+  DCHECK_LT(col_end, file_desc.file_length);
+  const ParquetFileVersion& file_version = parent_->file_version_;
+  if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
+    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+    // dictionary page header size in total_compressed_size and total_uncompressed_size
+    // (see IMPALA-694). We pad col_len to compensate.
+    int64_t bytes_remaining = file_desc.file_length - col_end;
+    int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+    col_len += pad;
+  }
+
+  // TODO: this will need to change when we have co-located files and the columns
+  // are different files.
+  if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+    return Status(Substitute("Expected parquet column file path '$0' to match "
+        "filename '$1'", col_chunk.file_path, filename()));
+  }
+
+  const ScanRange* metadata_range = parent_->metadata_range_;
+  int64_t partition_id = parent_->context_->partition_descriptor()->id();
+  const ScanRange* split_range =
+      static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+  // Determine if the column is completely contained within a local split.
+  bool col_range_local = split_range->expected_local()
+      && col_start >= split_range->offset()
+      && col_end <= split_range->offset() + split_range->len();
+  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
+      filename(), col_len, col_start, partition_id, split_range->disk_id(),
+      col_range_local,
+      BufferOpts(split_range->try_cache(), file_desc.mtime));
+  ClearDictionaryDecoder();
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::StartScan() {
+  DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  ScannerContext* context = parent_->context_;
+  DCHECK_GT(io_reservation_, 0);
+  bool needs_buffers;
+  RETURN_IF_ERROR(io_mgr->StartScanRange(
+      parent_->scan_node_->reader_context(), scan_range_, &needs_buffers));
+  if (needs_buffers) {
+    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+        parent_->scan_node_->reader_context(), context->bp_client(),
+        scan_range_, io_reservation_));
+  }
+  stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
+  DCHECK(stream_ != nullptr);
+  return Status::OK();
+}
+
 Status BaseScalarColumnReader::ReadPageHeader(bool peek,
     parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
+  DCHECK(stream_ != nullptr);
   *eos = false;
 
   uint8_t* buffer;
@@ -920,7 +1017,7 @@ Status BaseScalarColumnReader::InitDictionary() {
   bool eos;
   parquet::PageHeader next_page_header;
   uint32_t next_header_size;
-
+  DCHECK(stream_ != nullptr);
   DCHECK(!HasDictionaryDecoder());
 
   RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
@@ -1032,6 +1129,14 @@ Status BaseScalarColumnReader::InitDictionary() {
   return Status::OK();
 }
 
+Status BaseScalarColumnReader::InitDictionaries(
+    const vector<BaseScalarColumnReader*> readers) {
+  for (BaseScalarColumnReader* reader : readers) {
+    RETURN_IF_ERROR(reader->InitDictionary());
+  }
+  return Status::OK();
+}
+
 Status BaseScalarColumnReader::ReadDataPage() {
   // We're about to move to the next data page.  The previous data page is
   // now complete, free up any memory allocated for it. If the data page contained