You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/03 06:25:34 UTC

[11/11] impala git commit: Revert IMPALA-4835 and dependent changes

Revert IMPALA-4835 and dependent changes

Revert "IMPALA-6585: increase test_low_mem_limit_q21 limit"

This reverts commit 25bcb258dfd712f1514cf188206667a5e6be0e26.

Revert "IMPALA-6588: don't add empty list of ranges in text scan"

This reverts commit d57fbec6f67b83227b4c6117476da8f7d75fc4f6.

Revert "IMPALA-4835: Part 3: switch I/O buffers to buffer pool"

This reverts commit 24b4ed0b29a44090350e630d625291c01b753a36.

Revert "IMPALA-4835: Part 2: Allocate scan range buffers upfront"

This reverts commit 5699b59d0c5cbe37e888a367adb42fa12dfb0916.

Revert "IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation"

This reverts commit 65680dc42107db4ff2273c635cedf83d20f0ea94.

Change-Id: Ie5ca451cd96602886b0a8ecaa846957df0269cbb
Reviewed-on: http://gerrit.cloudera.org:8080/9480
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/161cbe30
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/161cbe30
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/161cbe30

Branch: refs/heads/master
Commit: 161cbe30ff0cba70e48fe48d41e0d76ec5273264
Parents: a2c9a7c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Mar 2 16:09:25 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Mar 3 04:22:12 2018 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |    1 -
 be/src/exec/base-sequence-scanner.cc            |    1 -
 be/src/exec/base-sequence-scanner.h             |    3 +-
 be/src/exec/hdfs-lzo-text-scanner.cc            |    1 -
 be/src/exec/hdfs-parquet-scanner-test.cc        |   96 -
 be/src/exec/hdfs-parquet-scanner.cc             |  234 +--
 be/src/exec/hdfs-parquet-scanner.h              |   44 +-
 be/src/exec/hdfs-scan-node-base.cc              |   95 +-
 be/src/exec/hdfs-scan-node-base.h               |    3 -
 be/src/exec/hdfs-scan-node-mt.cc                |   20 +-
 be/src/exec/hdfs-scan-node.cc                   |  172 +-
 be/src/exec/hdfs-scan-node.h                    |   66 +-
 be/src/exec/hdfs-text-scanner.cc                |    6 +-
 be/src/exec/parquet-column-readers.cc           |  108 +-
 be/src/exec/parquet-column-readers.h            |   88 +-
 be/src/exec/scanner-context.cc                  |   42 +-
 be/src/exec/scanner-context.h                   |   54 +-
 be/src/runtime/bufferpool/buffer-pool.h         |    1 -
 .../bufferpool/reservation-tracker-test.cc      |    8 +-
 be/src/runtime/bufferpool/reservation-util.cc   |    2 +-
 be/src/runtime/exec-env.cc                      |    7 +-
 be/src/runtime/io/disk-io-mgr-internal.h        |   16 -
 be/src/runtime/io/disk-io-mgr-stress-test.cc    |   43 +-
 be/src/runtime/io/disk-io-mgr-stress.cc         |   89 +-
 be/src/runtime/io/disk-io-mgr-stress.h          |   26 +-
 be/src/runtime/io/disk-io-mgr-test.cc           |  849 ++++----
 be/src/runtime/io/disk-io-mgr.cc                |  716 +++++--
 be/src/runtime/io/disk-io-mgr.h                 |  383 ++--
 be/src/runtime/io/request-context.cc            |  239 +--
 be/src/runtime/io/request-context.h             |  318 ++-
 be/src/runtime/io/request-ranges.h              |  196 +-
 be/src/runtime/io/scan-range.cc                 |  309 ++-
 be/src/runtime/mem-tracker.h                    |    1 +
 be/src/runtime/test-env.cc                      |    2 +-
 be/src/runtime/tmp-file-mgr-test.cc             |    3 +-
 be/src/runtime/tmp-file-mgr.cc                  |   18 +-
 be/src/runtime/tmp-file-mgr.h                   |   17 +-
 be/src/util/bit-util-test.cc                    |   11 -
 be/src/util/bit-util.h                          |    8 +-
 be/src/util/impalad-metrics.cc                  |   13 +-
 be/src/util/impalad-metrics.h                   |    9 +
 common/thrift/PlanNodes.thrift                  |    3 -
 .../apache/impala/analysis/SlotDescriptor.java  |   19 -
 .../org/apache/impala/analysis/SlotRef.java     |   20 +
 .../org/apache/impala/planner/HdfsScanNode.java |  167 +-
 .../java/org/apache/impala/util/BitUtil.java    |    6 -
 .../org/apache/impala/util/BitUtilTest.java     |    6 -
 .../queries/PlannerTest/constant-folding.test   |   42 +-
 .../queries/PlannerTest/disable-codegen.test    |   24 +-
 .../PlannerTest/fk-pk-join-detection.test       |   78 +-
 .../queries/PlannerTest/max-row-size.test       |   80 +-
 .../PlannerTest/min-max-runtime-filters.test    |    6 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   40 +-
 .../queries/PlannerTest/parquet-filtering.test  |   42 +-
 .../queries/PlannerTest/partition-pruning.test  |    4 +-
 .../PlannerTest/resource-requirements.test      | 1814 ++++--------------
 .../PlannerTest/sort-expr-materialization.test  |   32 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  192 +-
 .../queries/PlannerTest/tablesample.test        |   44 +-
 .../queries/PlannerTest/union.test              |    8 +-
 .../admission-reject-min-reservation.test       |   12 +-
 .../queries/QueryTest/analytic-fns.test         |    5 +-
 .../queries/QueryTest/codegen-mem-limit.test    |    5 +-
 .../QueryTest/disk-spill-encryption.test        |    2 +-
 .../queries/QueryTest/explain-level0.test       |    4 +-
 .../queries/QueryTest/explain-level1.test       |    4 +-
 .../queries/QueryTest/explain-level2.test       |   14 +-
 .../queries/QueryTest/explain-level3.test       |   14 +-
 .../queries/QueryTest/nested-types-tpch.test    |    6 +-
 .../queries/QueryTest/runtime_row_filters.test  |   11 +-
 .../queries/QueryTest/scanners.test             |    7 -
 .../functional-query/queries/QueryTest/set.test |    2 +-
 .../queries/QueryTest/spilling-aggs.test        |   19 +-
 .../spilling-naaj-no-deny-reservation.test      |    7 +-
 .../queries/QueryTest/spilling-naaj.test        |    8 +-
 .../QueryTest/spilling-no-debug-action.test     |   66 -
 .../QueryTest/spilling-sorts-exhaustive.test    |   10 +-
 .../queries/QueryTest/spilling.test             |   76 +-
 .../queries/QueryTest/stats-extrapolation.test  |   52 +-
 .../tpch/queries/sort-reservation-usage.test    |    9 +-
 tests/common/test_dimensions.py                 |   16 +-
 tests/custom_cluster/test_scratch_disk.py       |    2 +-
 tests/query_test/test_mem_usage_scaling.py      |   11 +-
 tests/query_test/test_query_mem_limit.py        |    4 +-
 tests/query_test/test_scanners.py               |   25 +-
 tests/query_test/test_scanners_fuzz.py          |    9 +-
 tests/query_test/test_sort.py                   |   16 +-
 tests/query_test/test_spilling.py               |    5 -
 88 files changed, 2801 insertions(+), 4565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index c1f91d6..aab1383 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -108,4 +108,3 @@ ADD_BE_TEST(parquet-version-test)
 ADD_BE_TEST(row-batch-list-test)
 ADD_BE_TEST(incr-stats-util-test)
 ADD_BE_TEST(hdfs-avro-scanner-test)
-ADD_BE_TEST(hdfs-parquet-scanner-test)

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 9d95b0b..9cb6330 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -46,7 +46,6 @@ static const int MIN_SYNC_READ_SIZE = 64 * 1024; // bytes
 
 Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
-  DCHECK(!files.empty());
   // Issue just the header range for each file.  When the header is complete,
   // we'll issue the splits for that file.  Splits cannot be processed until the
   // header is parsed (the header object is then shared across splits for that file).

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h
index 3c2326e..887ff6f 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -47,8 +47,7 @@ class ScannerContext;
 /// situation, causing the block to be incorrectly skipped.
 class BaseSequenceScanner : public HdfsScanner {
  public:
-  /// Issue the initial ranges for all sequence container files. 'files' must not be
-  /// empty.
+  /// Issue the initial ranges for all sequence container files.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-lzo-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc b/be/src/exec/hdfs-lzo-text-scanner.cc
index 8af89f2..88ae295 100644
--- a/be/src/exec/hdfs-lzo-text-scanner.cc
+++ b/be/src/exec/hdfs-lzo-text-scanner.cc
@@ -62,7 +62,6 @@ HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
 
 Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
-  DCHECK(!files.empty());
   if (LzoIssueInitialRanges == NULL) {
     lock_guard<SpinLock> l(lzo_load_lock_);
     if (library_load_status_.ok()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-parquet-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc b/be/src/exec/hdfs-parquet-scanner-test.cc
deleted file mode 100644
index cbc6e76..0000000
--- a/be/src/exec/hdfs-parquet-scanner-test.cc
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-parquet-scanner.h"
-#include "testutil/gtest-util.h"
-
-#include "common/names.h"
-
-static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
-static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
-
-namespace impala {
-
-class HdfsParquetScannerTest : public testing::Test {
- protected:
-  void TestDivideReservation(const vector<int64_t>& col_range_lengths,
-      int64_t total_col_reservation, const vector<int64_t>& expected_reservations);
-};
-
-/// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for
-/// inputs 'col_range_lengths' and 'total_col_reservation'.
-void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& col_range_lengths,
-      int64_t total_col_reservation, const vector<int64_t>& expected_reservations) {
-  vector<pair<int, int64_t>> reservations =
-      HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
-      MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, total_col_reservation);
-  for (int i = 0; i < reservations.size(); ++i) {
-    LOG(INFO) << i << " " << reservations[i].first << " " << reservations[i].second;
-  }
-  EXPECT_EQ(reservations.size(), expected_reservations.size());
-  vector<bool> present(expected_reservations.size(), false);
-  for (auto& reservation: reservations) {
-    // Ensure that each appears exactly once.
-    EXPECT_FALSE(present[reservation.first]);
-    present[reservation.first] = true;
-    EXPECT_EQ(expected_reservations[reservation.first], reservation.second)
-        << reservation.first;
-  }
-}
-
-TEST_F(HdfsParquetScannerTest, DivideReservation) {
-  // Test a long scan ranges with lots of memory - should allocate 3 max-size
-  // buffers per range.
-  TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * MAX_BUFFER_SIZE});
-  TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 1024,
-        {3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
-
-  // Long scan ranges, not enough memory for 3 buffers each. Should only allocate
-  // max-sized buffers, preferring the longer scan range.
-  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * MAX_BUFFER_SIZE,
-        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
-  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024},
-        5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE,
-        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
-  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * MAX_BUFFER_SIZE - 1,
-        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
-
-  // Test a short range with lots of memory - should round up buffer size.
-  TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024});
-
-  // Test a range << MIN_BUFFER_SIZE - should round up to buffer size.
-  TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE});
-
-  // Test long ranges with limited memory.
-  TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE});
-  TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE});
-  TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * MIN_BUFFER_SIZE});
-  TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, {MAX_BUFFER_SIZE / 2});
-  TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE},
-      3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, MIN_BUFFER_SIZE});
-
-  // Test a mix of scan range lengths larger than and smaller than the max I/O buffer
-  // size. Long ranges get allocated most memory.
-  TestDivideReservation(
-      {15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 317304},
-      25165824,
-      {8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144});
-}
-
-}
-
-IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 574ddb0..e279369 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,7 +17,6 @@
 
 #include "exec/hdfs-parquet-scanner.h"
 
-#include <algorithm>
 #include <queue>
 
 #include <gutil/strings/substitute.h>
@@ -28,7 +27,6 @@
 #include "exec/parquet-column-stats.h"
 #include "exec/scanner-context.inline.h"
 #include "runtime/collection-value-builder.h"
-#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 #include "runtime/runtime-filter.inline.h"
@@ -37,7 +35,6 @@
 #include "common/names.h"
 
 using std::move;
-using std::sort;
 using namespace impala;
 using namespace impala::io;
 
@@ -50,6 +47,10 @@ constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
 static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
     "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
 
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+const int MAX_DICT_HEADER_SIZE = 100;
+
 // Max entries in the dictionary before switching to PLAIN encoding. If a dictionary
 // has fewer entries, then the entire column is dictionary encoded. This threshold
 // is guaranteed to be true for Impala versions 2.9 or below.
@@ -68,7 +69,6 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
-  DCHECK(!files.empty());
   vector<ScanRange*> footer_ranges;
   for (int i = 0; i < files.size(); ++i) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
@@ -98,7 +98,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
             static_cast<ScanRangeMetadata*>(split->meta_data());
         // Each split is processed by first issuing a scan range for the file footer, which
         // is done here, followed by scan ranges for the columns of each row group within
-        // the actual split (see InitScalarColumns()). The original split is stored in the
+        // the actual split (in InitColumns()). The original split is stored in the
         // metadata associated with the footer range.
         ScanRange* footer_range;
         if (footer_split != nullptr) {
@@ -121,13 +121,9 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
       }
     }
   }
-  // We may not have any scan ranges if this scan node does not have the footer split for
-  // any Parquet file.
-  if (footer_ranges.size() > 0) {
-    // The threads that process the footer will also do the scan, so we mark all the files
-    // as complete here.
-    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
-  }
+  // The threads that process the footer will also do the scan, so we mark all the files
+  // as complete here.
+  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
   return Status::OK();
 }
 
@@ -232,14 +228,9 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
-  int64_t stream_reservation = stream_->reservation();
-  stream_ = nullptr;
   context_->ReleaseCompletedResources(true);
   context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
-  // The scanner-wide stream was used only to read the file footer.  Each column has added
-  // its own stream. We can use the reservation from 'stream_' for the columns now.
-  total_col_reservation_ = stream_reservation;
 
   // Parse the file schema into an internal representation for schema resolution.
   schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
@@ -256,6 +247,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
 
   RETURN_IF_ERROR(InitDictFilterStructures());
+
+  // The scanner-wide stream was used only to read the file footer.  Each column has added
+  // its own stream.
+  stream_ = nullptr;
   return Status::OK();
 }
 
@@ -679,13 +674,15 @@ Status HdfsParquetScanner::NextRowGroup() {
     }
 
     InitCollectionColumns();
-    RETURN_IF_ERROR(InitScalarColumns());
 
-    // Start scanning dictionary filtering column readers, so we can read the dictionary
-    // pages in EvalDictionaryFilters().
-    RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
+    // Prepare dictionary filtering columns for first read
+    // This must be done before dictionary filtering, because this code initializes
+    // the column offsets and streams needed to read the dictionaries.
+    // TODO: Restructure the code so that the dictionary can be read without the rest
+    // of the column.
+    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
 
-    // StartScans() may have allocated resources to scan columns. If we skip this row
+    // InitColumns() may have allocated resources to scan columns. If we skip this row
     // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
 
     // If there is a dictionary-encoded column where every value is eliminated
@@ -706,10 +703,10 @@ Status HdfsParquetScanner::NextRowGroup() {
     }
 
     // At this point, the row group has passed any filtering criteria
-    // Start scanning non-dictionary filtering column readers and initialize their
-    // dictionaries.
-    RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
-    status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
+    // Prepare non-dictionary filtering column readers for first read and
+    // initialize their dictionaries.
+    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_));
+    status = InitDictionaries(non_dict_filterable_readers_);
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
@@ -744,6 +741,7 @@ Status HdfsParquetScanner::NextRowGroup() {
       break;
     }
   }
+
   DCHECK(parse_status_.ok());
   return Status::OK();
 }
@@ -801,7 +799,6 @@ void HdfsParquetScanner::PartitionReaders(
     } else {
       BaseScalarColumnReader* scalar_reader =
           static_cast<BaseScalarColumnReader*>(reader);
-      scalar_readers_.push_back(scalar_reader);
       if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
         dict_filterable_readers_.push_back(scalar_reader);
       } else {
@@ -993,7 +990,7 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
 
   // Any columns that were not 100% dictionary encoded need to initialize
   // their dictionaries here.
-  RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
+  RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list));
 
   return Status::OK();
 }
@@ -1442,15 +1439,12 @@ Status HdfsParquetScanner::ProcessFooter() {
         BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
 
     unique_ptr<BufferDescriptor> io_buffer;
-    bool needs_buffers;
-    RETURN_IF_ERROR(io_mgr->StartScanRange(
-          scan_node_->reader_context(), metadata_range, &needs_buffers));
-    DCHECK(!needs_buffers) << "Already provided a buffer";
-    RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
+    RETURN_IF_ERROR(
+        io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer));
     DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
     DCHECK_EQ(io_buffer->len(), metadata_size);
     DCHECK(io_buffer->eosr());
-    metadata_range->ReturnBuffer(move(io_buffer));
+    io_mgr->ReturnBuffer(move(io_buffer));
   }
 
   // Deserialize file header
@@ -1650,16 +1644,23 @@ void HdfsParquetScanner::InitCollectionColumns() {
   }
 }
 
-Status HdfsParquetScanner::InitScalarColumns() {
+Status HdfsParquetScanner::InitScalarColumns(
+    int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
   DCHECK(file_desc != nullptr);
-  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
 
+  // All the scan ranges (one for each column).
+  vector<ScanRange*> col_ranges;
   // Used to validate that the number of values in each reader in column_readers_ at the
   // same SchemaElement is the same.
   unordered_map<const parquet::SchemaElement*, int> num_values_map;
-  for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
+  // Used to validate we issued the right number of scan ranges
+  int num_scalar_readers = 0;
+
+  for (BaseScalarColumnReader* scalar_reader: column_readers) {
+    ++num_scalar_readers;
     const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()];
     auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
     int num_values = -1;
@@ -1668,115 +1669,78 @@ Status HdfsParquetScanner::InitScalarColumns() {
     } else {
       num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values;
     }
+    int64_t col_start = col_chunk.meta_data.data_page_offset;
+
     if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
       // TODO: improve this error message by saying which columns are different,
       // and also specify column in other error messages as appropriate
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
     }
-    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
-  }
-  RETURN_IF_ERROR(
-      DivideReservationBetweenColumns(scalar_readers_, total_col_reservation_));
-  return Status::OK();
-}
 
-Status HdfsParquetScanner::DivideReservationBetweenColumns(
-    const vector<BaseScalarColumnReader*>& column_readers,
-    int64_t reservation_to_distribute) {
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
-  const int64_t min_buffer_size = io_mgr->min_buffer_size();
-  const int64_t max_buffer_size = io_mgr->max_buffer_size();
-  // The HdfsScanNode reservation calculation in the planner ensures that we have
-  // reservation for at least one buffer per column.
-  if (reservation_to_distribute < min_buffer_size * column_readers.size()) {
-    return Status(TErrorCode::INTERNAL_ERROR,
-        Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at "
-                   "least $1 bytes per column for $2 columns but had $3 bytes",
-            filename(), min_buffer_size, column_readers.size(),
-            reservation_to_distribute));
-  }
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_,
+        filename(), row_group_idx, scalar_reader->col_idx(),
+        scalar_reader->schema_element(), state_));
 
-  vector<int64_t> col_range_lengths(column_readers.size());
-  for (int i = 0; i < column_readers.size(); ++i) {
-    col_range_lengths[i] = column_readers[i]->scan_range()->len();
-  }
-  vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
-      min_buffer_size, max_buffer_size, col_range_lengths, reservation_to_distribute);
-  for (auto& tmp_reservation : tmp_reservations) {
-    column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
-  }
-  return Status::OK();
-}
+    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+      // Already validated in ValidateColumnOffsets()
+      DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+      col_start = col_chunk.meta_data.dictionary_page_offset;
+    }
+    int64_t col_len = col_chunk.meta_data.total_compressed_size;
+    if (col_len <= 0) {
+      return Status(Substitute("File '$0' contains invalid column chunk size: $1",
+          filename(), col_len));
+    }
+    int64_t col_end = col_start + col_len;
+
+    // Already validated in ValidateColumnOffsets()
+    DCHECK_GT(col_end, 0);
+    DCHECK_LT(col_end, file_desc->file_length);
+    if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) {
+      // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+      // dictionary page header size in total_compressed_size and total_uncompressed_size
+      // (see IMPALA-694). We pad col_len to compensate.
+      int64_t bytes_remaining = file_desc->file_length - col_end;
+      int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+      col_len += pad;
+    }
 
-vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
-    int64_t min_buffer_size, int64_t max_buffer_size,
-    const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) {
-  // Pair of (column index, reservation allocated).
-  vector<pair<int, int64_t>> tmp_reservations;
-  for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0);
-
-  // Sort in descending order of length, breaking ties by index so that larger columns
-  // get allocated reservation first. It is common to have dramatically different column
-  // sizes in a single file because of different value sizes and compressibility. E.g.
-  // consider a large STRING "comment" field versus a highly compressible
-  // dictionary-encoded column with only a few distinct values. We want to give max-sized
-  // buffers to large columns first to maximize the size of I/Os that we do while reading
-  // this row group.
-  sort(tmp_reservations.begin(), tmp_reservations.end(),
-      [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
-        int64_t left_len = col_range_lengths[left.first];
-        int64_t right_len = col_range_lengths[right.first];
-        return left_len != right_len ? left_len > right_len : left.first < right.first;
-      });
-
-  // Set aside the minimum reservation per column.
-  reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
-
-  // Allocate reservations to columns by repeatedly allocating either a max-sized buffer
-  // or a large enough buffer to fit the remaining data for each column. Do this
-  // round-robin up to the ideal number of I/O buffers.
-  for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
-    for (auto& tmp_reservation : tmp_reservations) {
-      // Add back the reservation we set aside above.
-      if (i == 0) reservation_to_distribute += min_buffer_size;
-
-      int64_t bytes_left_in_range =
-          col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
-      int64_t bytes_to_add;
-      if (bytes_left_in_range >= max_buffer_size) {
-        if (reservation_to_distribute >= max_buffer_size) {
-          bytes_to_add = max_buffer_size;
-        } else if (i == 0) {
-          DCHECK_EQ(0, tmp_reservation.second);
-          // Ensure this range gets at least one buffer on the first iteration.
-          bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
-        } else {
-          DCHECK_GT(tmp_reservation.second, 0);
-          // We need to read more than the max buffer size, but can't allocate a
-          // max-sized buffer. Stop adding buffers to this column: we prefer to use
-          // the existing max-sized buffers without small buffers mixed in so that
-          // we will alway do max-sized I/Os, which make efficient use of I/O devices.
-          bytes_to_add = 0;
-        }
-      } else if (bytes_left_in_range > 0 &&
-          reservation_to_distribute >= min_buffer_size) {
-        // Choose a buffer size that will fit the rest of the bytes left in the range.
-        bytes_to_add = max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
-        // But don't add more reservation than is available.
-        bytes_to_add =
-            min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
-      } else {
-        bytes_to_add = 0;
-      }
-      DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add;
-      reservation_to_distribute -= bytes_to_add;
-      tmp_reservation.second += bytes_to_add;
-      DCHECK_GE(reservation_to_distribute, 0);
-      DCHECK_GT(tmp_reservation.second, 0);
+    // TODO: this will need to change when we have co-located files and the columns
+    // are different files.
+    if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+      return Status(Substitute("Expected parquet column file path '$0' to match "
+          "filename '$1'", col_chunk.file_path, filename()));
     }
+
+    const ScanRange* split_range =
+        static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
+
+    // Determine if the column is completely contained within a local split.
+    bool col_range_local = split_range->expected_local()
+        && col_start >= split_range->offset()
+        && col_end <= split_range->offset() + split_range->len();
+    ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
+        filename(), col_len, col_start, partition_id, split_range->disk_id(),
+        col_range_local,
+        BufferOpts(split_range->try_cache(), file_desc->mtime));
+    col_ranges.push_back(col_range);
+
+    // Get the stream that will be used for this column
+    ScannerContext::Stream* stream = context_->AddStream(col_range);
+    DCHECK(stream != nullptr);
+
+    RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
   }
-  return tmp_reservations;
+  DCHECK_EQ(col_ranges.size(), num_scalar_readers);
+
+  // Issue all the column chunks to the io mgr and have them scheduled immediately.
+  // This means these ranges aren't returned via DiskIoMgr::GetNextRange and
+  // instead are scheduled to be read immediately.
+  RETURN_IF_ERROR(scan_node_->runtime_state()->io_mgr()->AddScanRanges(
+      scan_node_->reader_context(), col_ranges, true));
+
+  return Status::OK();
 }
 
 Status HdfsParquetScanner::InitDictionaries(

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 92f2550..f0043b5 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -34,7 +34,7 @@ class CollectionValueBuilder;
 struct HdfsFileDesc;
 
 /// Internal schema representation and resolution.
-struct SchemaNode;
+class SchemaNode;
 
 /// Class that implements Parquet definition and repetition level decoding.
 class ParquetLevelDecoder;
@@ -69,8 +69,8 @@ class BoolColumnReader;
 /// the split size, the mid point guarantees that we have at least 50% of the row group in
 /// the current split. ProcessSplit() then computes the column ranges for these row groups
 /// and submits them to the IoMgr for immediate scheduling (so they don't surface in
-/// DiskIoMgr::GetNextUnstartedRange()). Scheduling them immediately also guarantees they
-/// are all read at once.
+/// DiskIoMgr::GetNextRange()). Scheduling them immediately also guarantees they are all
+/// read at once.
 ///
 /// Like the other scanners, each parquet scanner object is one to one with a
 /// ScannerContext. Unlike the other scanners though, the context will have multiple
@@ -328,7 +328,7 @@ class HdfsParquetScanner : public HdfsScanner {
   virtual ~HdfsParquetScanner() {}
 
   /// Issue just the footer range for each file.  We'll then parse the footer and pick
-  /// out the columns we want. 'files' must not be empty.
+  /// out the columns we want.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;
@@ -361,7 +361,6 @@ class HdfsParquetScanner : public HdfsScanner {
   template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
   friend class ScalarColumnReader;
   friend class BoolColumnReader;
-  friend class HdfsParquetScannerTest;
 
   /// Size of the file footer.  This is a guess.  If this value is too little, we will
   /// need to issue another read.
@@ -430,7 +429,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of scratch batches processed so far.
   int64_t row_batches_produced_;
 
-  /// Column reader for each top-level materialized slot in the output tuple.
+  /// Column reader for each materialized columns for this file.
   std::vector<ParquetColumnReader*> column_readers_;
 
   /// Column readers will write slot values into this scratch batch for
@@ -446,9 +445,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_;
 
-  /// Reservation available for scanning columns, in bytes.
-  int64_t total_col_reservation_ = 0;
-
   /// Pool to copy dictionary page buffer into. This pool is shared across all the
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -465,9 +461,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// or nested within a collection.
   std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
 
-  /// Flattened list of all scalar column readers in column_readers_.
-  std::vector<BaseScalarColumnReader*> scalar_readers_;
-
   /// Flattened collection column readers that point to readers in column_readers_.
   std::vector<CollectionColumnReader*> collection_readers_;
 
@@ -634,24 +627,12 @@ class HdfsParquetScanner : public HdfsScanner {
       WARN_UNUSED_RESULT;
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  This
-  /// initializes 'scalar_readers_' and divides reservation between the columns but
-  /// does not start any scan ranges.
-  Status InitScalarColumns() WARN_UNUSED_RESULT;
-
-  /// Decides how to divide 'reservation_to_distribute' bytes of reservation between the
-  /// columns. Sets the reservation on each corresponding reader in 'column_readers'.
-  Status DivideReservationBetweenColumns(
-      const std::vector<BaseScalarColumnReader*>& column_readers,
-      int64_t reservation_to_distribute);
-
-  /// Helper for DivideReservationBetweenColumns. Implements the core algorithm for
-  /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
-  /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
-  /// a vector with an entry per column with the index into 'col_range_lengths' and the
-  /// amount of reservation in bytes to give to that column.
-  static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper(
-      int64_t min_buffer_size, int64_t max_buffer_size,
-      const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute);
+  /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
+  /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of
+  /// top-level scalar columns and readers of scalar columns within a collection node).
+  Status InitScalarColumns(
+      int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers)
+      WARN_UNUSED_RESULT;
 
   /// Initializes the column readers in collection_readers_.
   void InitCollectionColumns();
@@ -687,8 +668,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Partitions the readers into scalar and collection readers. The collection readers
   /// are flattened into collection_readers_. The scalar readers are partitioned into
   /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
-  /// dictionary filtering is enabled and the reader can be dictionary filtered. All
-  /// scalar readers are also flattened into scalar_readers_.
+  /// dictionary filtering is enabled and the reader can be dictionary filtered.
   void PartitionReaders(const vector<ParquetColumnReader*>& readers,
                         bool can_eval_dict_filters);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 5625389..98c6e14 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -35,7 +35,6 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
-#include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/request-context.h"
@@ -65,7 +64,6 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
-      ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation),
       min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
           tnode.hdfs_scan_node.min_max_tuple_id : -1),
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
@@ -82,7 +80,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
           &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
       disks_accessed_bitmap_(TUnit::UNIT, 0),
       active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
-  DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation);
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -112,6 +109,7 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
+
   return Status::OK();
 }
 
@@ -242,16 +240,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
   ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
 
-  // Check if reservation was enough to allocate at least one buffer. The
-  // reservation calculation in HdfsScanNode.java should guarantee this.
-  // Hitting this error indicates a misconfiguration or bug.
-  int64_t min_buffer_size = ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size();
-  if (scan_range_params_->size() > 0
-      && resource_profile_.min_reservation < min_buffer_size) {
-    return Status(TErrorCode::INTERNAL_ERROR,
-      Substitute("HDFS scan min reservation $0 must be >= min buffer size $1",
-       resource_profile_.min_reservation, min_buffer_size));
-  }
   // Add per volume stats to the runtime profile
   PerVolumeStats per_volume_stats;
   stringstream str;
@@ -338,18 +326,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
         partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state);
   }
 
-  RETURN_IF_ERROR(ClaimBufferReservation(state));
-  // We got the minimum reservation. Now try to get ideal reservation.
-  if (resource_profile_.min_reservation != ideal_scan_range_reservation_) {
-    bool increased = buffer_pool_client_.IncreaseReservation(
-        ideal_scan_range_reservation_ - resource_profile_.min_reservation);
-    VLOG_FILE << "Increasing reservation from minimum "
-              << resource_profile_.min_reservation << "B to ideal "
-              << ideal_scan_range_reservation_ << "B "
-              << (increased ? "succeeded" : "failed");
-  }
-
-  reader_context_ = runtime_state_->io_mgr()->RegisterContext();
+  reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker());
 
   // Initialize HdfsScanNode specific counters
   // TODO: Revisit counters and move the counters specific to multi-threaded scans
@@ -370,11 +347,14 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  reader_context_->set_bytes_read_counter(bytes_read_counter());
-  reader_context_->set_read_timer(read_timer());
-  reader_context_->set_open_file_timer(open_file_timer());
-  reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
-  reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
+  runtime_state_->io_mgr()->set_bytes_read_counter(
+      reader_context_.get(), bytes_read_counter());
+  runtime_state_->io_mgr()->set_read_timer(reader_context_.get(), read_timer());
+  runtime_state_->io_mgr()->set_open_file_timer(reader_context_.get(), open_file_timer());
+  runtime_state_->io_mgr()->set_active_read_thread_counter(
+      reader_context_.get(), &active_hdfs_read_thread_counter_);
+  runtime_state_->io_mgr()->set_disks_access_bitmap(
+      reader_context_.get(), &disks_accessed_bitmap_);
 
   average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
       AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
@@ -468,27 +448,18 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
     }
   }
 
-  // Issue initial ranges for all file types. Only call functions for file types that
-  // actually exist - trying to add empty lists of ranges can result in spurious
-  // CANCELLED errors - see IMPALA-6564.
-  for (const auto& entry : matching_per_type_files) {
-    if (entry.second.empty()) continue;
-    switch (entry.first) {
-      case THdfsFileFormat::PARQUET:
-        RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, entry.second));
-        break;
-      case THdfsFileFormat::TEXT:
-        RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, entry.second));
-        break;
-      case THdfsFileFormat::SEQUENCE_FILE:
-      case THdfsFileFormat::RC_FILE:
-      case THdfsFileFormat::AVRO:
-        RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, entry.second));
-        break;
-      default:
-        DCHECK(false) << "Unexpected file type " << entry.first;
-    }
-  }
+  // Issue initial ranges for all file types.
+  RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::PARQUET]));
+  RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::TEXT]));
+  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
+  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::RC_FILE]));
+  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::AVRO]));
+
   return Status::OK();
 }
 
@@ -547,8 +518,6 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
 
 Status HdfsScanNodeBase::AddDiskIoRanges(
     const vector<ScanRange*>& ranges, int num_files_queued) {
-  DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
-  DCHECK_GT(ranges.size(), 0);
   RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
@@ -851,14 +820,20 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
       Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
 
   if (reader_context_ != nullptr) {
-    bytes_read_local_->Set(reader_context_->bytes_read_local());
-    bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
-    bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
-    num_remote_ranges_->Set(reader_context_->num_remote_ranges());
-    unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
-    cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
+    bytes_read_local_->Set(
+        runtime_state_->io_mgr()->bytes_read_local(reader_context_.get()));
+    bytes_read_short_circuit_->Set(
+        runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_.get()));
+    bytes_read_dn_cache_->Set(
+        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_.get()));
+    num_remote_ranges_->Set(static_cast<int64_t>(
+        runtime_state_->io_mgr()->num_remote_ranges(reader_context_.get())));
+    unexpected_remote_bytes_->Set(
+        runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_.get()));
+    cached_file_handles_hit_count_->Set(
+        runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_.get()));
     cached_file_handles_miss_count_->Set(
-        reader_context_->cached_file_handles_miss_count());
+        runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_.get()));
 
     if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
       runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3a9c37f..70fbac2 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -322,9 +322,6 @@ class HdfsScanNodeBase : public ScanNode {
   friend class ScannerContext;
   friend class HdfsScanner;
 
-  /// Ideal reservation to process each input split, computed by the planner.
-  const int64_t ideal_scan_range_reservation_;
-
   /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
   const int min_max_tuple_id_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index f4948d9..7ea4d80 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -26,7 +26,6 @@
 
 #include "gen-cpp/PlanNodes_types.h"
 
-using namespace impala::io;
 using std::stringstream;
 
 namespace impala {
@@ -77,27 +76,20 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
       scanner_->Close(row_batch);
       scanner_.reset();
     }
-    DiskIoMgr* io_mgr = runtime_state_->io_mgr();
-    bool needs_buffers;
-    RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
-        reader_context_.get(), &scan_range_, &needs_buffers));
-    if (scan_range_ == nullptr) {
+    RETURN_IF_ERROR(
+        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range_));
+    if (scan_range_ == NULL) {
       *eos = true;
       StopAndFinalizeCounters();
       return Status::OK();
     }
-    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
-    if (needs_buffers) {
-      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(),
-          &buffer_pool_client_, scan_range_, scanner_reservation));
-    }
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
-    scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_,
-        partition, filter_ctxs(), expr_results_pool()));
-    scanner_ctx_->AddStream(scan_range_, scanner_reservation);
+    scanner_ctx_.reset(new ScannerContext(
+        runtime_state_, this, partition, scan_range_, filter_ctxs(),
+        expr_results_pool()));
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
     if (!status.ok()) {
       DCHECK(scanner_ == NULL);

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index a95e47a..710a8af 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -26,7 +26,6 @@
 #include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
 #include "runtime/fragment-instance-state.h"
-#include "runtime/io/request-context.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
@@ -46,6 +45,16 @@ DECLARE_bool(skip_file_runtime_filtering);
 using namespace impala;
 using namespace impala::io;
 
+// Amount of memory that we approximate a scanner thread will use not including IoBuffers.
+// The memory used does not vary considerably between file formats (just a couple of MBs).
+// This value is conservative and taken from running against the tpch lineitem table.
+// TODO: revisit how we do this.
+const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
+
+// Estimated upper bound on the compression ratio of compressed text files. Used to
+// estimate scanner thread memory usage.
+const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
+
 // Amount of time to block waiting for GetNext() to release scanner threads between
 // checking if a scanner thread should yield itself back to the global thread pool.
 const int SCANNER_THREAD_WAIT_TIME_MS = 20;
@@ -53,6 +62,11 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20;
 HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
                            const DescriptorTbl& descs)
     : HdfsScanNodeBase(pool, tnode, descs),
+      ranges_issued_barrier_(1),
+      scanner_thread_bytes_required_(0),
+      done_(false),
+      all_ranges_started_(false),
+      thread_avail_cb_id_(-1),
       max_num_scanner_threads_(CpuInfo::num_cores()) {
 }
 
@@ -153,6 +167,36 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
+
+  // Compute the minimum bytes required to start a new thread. This is based on the
+  // file format.
+  // The higher the estimate, the less likely it is the query will fail but more likely
+  // the query will be throttled when it does not need to be.
+  // TODO: how many buffers should we estimate per range. The IoMgr will throttle down to
+  // one but if there are already buffers queued before memory pressure was hit, we can't
+  // reclaim that memory.
+  if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
+    // Parquet files require buffers per column
+    scanner_thread_bytes_required_ =
+        materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_read_buffer_size();
+  } else {
+    scanner_thread_bytes_required_ =
+        3 * runtime_state_->io_mgr()->max_read_buffer_size();
+  }
+  // scanner_thread_bytes_required_ now contains the IoBuffer requirement.
+  // Next we add in the other memory the scanner thread will use.
+  // e.g. decompression buffers, tuple buffers, etc.
+  // For compressed text, we estimate this based on the file size (since the whole file
+  // will need to be decompressed at once). For all other formats, we use a constant.
+  // TODO: can we do something better?
+  int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE;
+  for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) {
+    if (file->file_compression != THdfsCompression::NONE) {
+      int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO;
+      scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage);
+    }
+  }
+  scanner_thread_bytes_required_ += scanner_thread_mem_usage;
   row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
   row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
   return Status::OK();
@@ -174,9 +218,10 @@ Status HdfsScanNode::Open(RuntimeState* state) {
     max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
   }
   DCHECK_GT(max_num_scanner_threads_, 0);
-  spare_reservation_.Store(buffer_pool_client_.GetReservation());
+
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
+
   return Status::OK();
 }
 
@@ -217,28 +262,37 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
   return Status::OK();
 }
 
-bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& lock) {
-  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true;
-  int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load();
-  if (!buffer_pool_client_.IncreaseReservation(increase)) return false;
-  spare_reservation_.Add(increase);
-  return true;
-}
-
-int64_t HdfsScanNode::DeductReservationForScannerThread(const unique_lock<mutex>& lock,
-    bool first_thread) {
-  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  int64_t amount;
-  if (first_thread) {
-    amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ?
-        ideal_scan_range_reservation_ : resource_profile_.min_reservation;
-  } else {
-    amount = ideal_scan_range_reservation_;
+// For controlling the amount of memory used for scanners, we approximate the
+// scanner mem usage based on scanner_thread_bytes_required_, rather than the
+// consumption in the scan node's mem tracker. The problem with the scan node
+// trackers is that it does not account for the memory the scanner will use.
+// For example, if there is 110 MB of memory left (based on the mem tracker)
+// and we estimate that a scanner will use 100MB, we want to make sure to only
+// start up one additional thread. However, after starting the first thread, the
+// mem tracker value will not change immediately (it takes some time before the
+// scanner is running and starts using memory). Therefore we just use the estimate
+// based on the number of running scanner threads.
+bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) {
+  int64_t committed_scanner_mem =
+      active_scanner_thread_counter_.value() * scanner_thread_bytes_required_;
+  int64_t tracker_consumption = mem_tracker()->consumption();
+  int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption;
+  if (est_additional_scanner_mem < 0) {
+    // This is the case where our estimate was too low. Expand the estimate based
+    // on the usage.
+    int64_t avg_consumption =
+        tracker_consumption / active_scanner_thread_counter_.value();
+    // Take the average and expand it by 50%. Some scanners will not have hit their
+    // steady state mem usage yet.
+    // TODO: how can we scale down if we've overestimated.
+    // TODO: better heuristic?
+    scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5);
+    est_additional_scanner_mem = 0;
   }
-  int64_t remainder = spare_reservation_.Add(-amount);
-  DCHECK_GE(remainder, 0);
-  return amount;
+
+  // If we are starting a new thread, take that into account now.
+  if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_;
+  return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
 }
 
 void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
@@ -268,45 +322,36 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
     // TODO: This still leans heavily on starvation-free locks, come up with a more
     // correct way to communicate between this method and ScannerThreadHelper
     unique_lock<mutex> lock(lock_);
-
-    const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value();
-    const bool first_thread = num_active_scanner_threads == 0;
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
-        num_active_scanner_threads >= progress_.remaining()) {
+        active_scanner_thread_counter_.value() >= progress_.remaining()) {
       break;
     }
 
     // Cases 5 and 6.
-    if (!first_thread &&
+    if (active_scanner_thread_counter_.value() > 0 &&
         (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
-         !EnoughReservationForExtraThread(lock))) {
+         !EnoughMemoryForScannerThread(true))) {
       break;
     }
 
     // Case 7 and 8.
-    if (num_active_scanner_threads >= max_num_scanner_threads_ ||
-        !pool->TryAcquireThreadToken()) {
+    bool is_reserved = false;
+    if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
+        !pool->TryAcquireThreadToken(&is_reserved)) {
       break;
     }
 
-    // Deduct the reservation. We haven't dropped the lock since the
-    // first_thread/EnoughReservationForExtraThread() checks so spare reservation
-    // must be available.
-    int64_t scanner_thread_reservation =
-        DeductReservationForScannerThread(lock, first_thread);
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
-    auto fn = [this, scanner_thread_reservation]() {
-      this->ScannerThread(scanner_thread_reservation);
-    };
+
+    auto fn = [this]() { this->ScannerThread(); };
     std::unique_ptr<Thread> t;
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
     if (!status.ok()) {
-      ReturnReservationFromScannerThread(scanner_thread_reservation);
       COUNTER_ADD(&active_scanner_thread_counter_, -1);
       // Release the token and skip running callbacks to find a replacement. Skipping
       // serves two purposes. First, it prevents a mutual recursion between this function
@@ -327,10 +372,9 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   }
 }
 
-void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
+void HdfsScanNode::ScannerThread() {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
-  DiskIoMgr* io_mgr = runtime_state_->io_mgr();
 
   // Make thread-local copy of filter contexts to prune scan ranges, and to pass to the
   // scanner for finer-grained filtering. Use a thread-local MemPool for the filter
@@ -356,7 +400,8 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
       // this thread.
       unique_lock<mutex> l(lock_);
       if (active_scanner_thread_counter_.value() > 1) {
-        if (runtime_state_->resource_pool()->optional_exceeded()) {
+        if (runtime_state_->resource_pool()->optional_exceeded() ||
+            !EnoughMemoryForScannerThread(false)) {
           // We can't break here. We need to update the counter with the lock held or else
           // all threads might see active_scanner_thread_counter_.value > 1
           COUNTER_ADD(&active_scanner_thread_counter_, -1);
@@ -376,29 +421,21 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
     // to return if there's an error.
     ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
 
-    // Take a snapshot of num_unqueued_files_ before calling GetNextUnstartedRange().
+    ScanRange* scan_range;
+    // Take a snapshot of num_unqueued_files_ before calling GetNextRange().
     // We don't want num_unqueued_files_ to go to zero between the return from
-    // GetNextUnstartedRange() and the check for when all ranges are complete.
+    // GetNextRange() and the check for when all ranges are complete.
     int num_unqueued_files = num_unqueued_files_.Load();
     // TODO: the Load() acts as an acquire barrier.  Is this needed? (i.e. any earlier
     // stores that need to complete?)
     AtomicUtil::MemoryBarrier();
-    ScanRange* scan_range;
-    bool needs_buffers;
     Status status =
-        io_mgr->GetNextUnstartedRange(reader_context_.get(), &scan_range, &needs_buffers);
+        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range);
 
-    if (status.ok() && scan_range != nullptr) {
-      if (needs_buffers) {
-        status = io_mgr->AllocateBuffersForRange(
-            reader_context_.get(), &buffer_pool_client_, scan_range,
-            scanner_thread_reservation);
-      }
-      if (status.ok()) {
-        // Got a scan range. Process the range end to end (in this thread).
-        status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
-            &expr_results_pool, scan_range, scanner_thread_reservation);
-      }
+    if (status.ok() && scan_range != NULL) {
+      // Got a scan range. Process the range end to end (in this thread).
+      status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
+          &expr_results_pool, scan_range);
     }
 
     if (!status.ok()) {
@@ -428,8 +465,8 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
       // TODO: Based on the usage pattern of all_ranges_started_, it looks like it is not
       // needed to acquire the lock in x86.
       unique_lock<mutex> l(lock_);
-      // All ranges have been queued and GetNextUnstartedRange() returned NULL. This means
-      // that every range is either done or being processed by another thread.
+      // All ranges have been queued and GetNextRange() returned NULL. This means that
+      // every range is either done or being processed by another thread.
       all_ranges_started_ = true;
       break;
     }
@@ -437,7 +474,6 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
 exit:
-  ReturnReservationFromScannerThread(scanner_thread_reservation);
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
@@ -445,8 +481,7 @@ exit:
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
-    MemPool* expr_results_pool, ScanRange* scan_range,
-    int64_t scanner_thread_reservation) {
+    MemPool* expr_results_pool, ScanRange* scan_range) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -471,9 +506,8 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     return Status::OK();
   }
 
-  ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition,
-      filter_ctxs, expr_results_pool);
-  context.AddStream(scan_range, scanner_thread_reservation);
+  ScannerContext context(
+      runtime_state_, this, partition, scan_range, filter_ctxs, expr_results_pool);
   scoped_ptr<HdfsScanner> scanner;
   Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {
@@ -515,7 +549,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
 void HdfsScanNode::SetDoneInternal() {
   if (done_) return;
   done_ = true;
-  if (reader_context_ != nullptr) reader_context_->Cancel();
+  if (reader_context_ != nullptr) {
+    runtime_state_->io_mgr()->CancelContext(reader_context_.get());
+  }
   materialized_row_batches_->Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 81e826e..a1c97cf 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -27,7 +27,6 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
-#include "common/atomic.h"
 #include "exec/filter-context.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -59,14 +58,8 @@ class TPlanNode;
 /// 5. The scanner finishes the scan range and informs the scan node so it can track
 ///    end of stream.
 ///
-/// Buffer management:
-/// ------------------
-/// The different scanner threads all allocate I/O buffers from the node's Buffer Pool
-/// client. The scan node ensures that enough reservation is available to start a
-/// scanner thread before launching each one with (see
-/// EnoughReservationForExtraThread()), after which the scanner thread is responsible
-/// for staying within the reservation handed off to it.
-///
+/// TODO: This class allocates a bunch of small utility objects that should be
+/// recycled.
 /// TODO: Remove this class once the fragment-based multi-threaded execution is
 /// fully functional.
 class HdfsScanNode : public HdfsScanNodeBase {
@@ -107,7 +100,12 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
  private:
   /// Released when initial ranges are issued in the first call to GetNext().
-  CountingBarrier ranges_issued_barrier_{1};
+  CountingBarrier ranges_issued_barrier_;
+
+  /// The estimated memory required to start up a new scanner thread. If the memory
+  /// left (due to limits) is less than this value, we won't start up optional
+  /// scanner threads.
+  int64_t scanner_thread_bytes_required_;
 
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
@@ -132,16 +130,16 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// are finished, an error/cancellation occurred, or the limit was reached.
   /// Setting this to true triggers the scanner threads to clean up.
   /// This should not be explicitly set. Instead, call SetDone().
-  bool done_ = false;
+  bool done_;
 
   /// Set to true if all ranges have started. Some of the ranges may still be in flight
   /// being processed by scanner threads, but no new ScannerThreads should be started.
-  bool all_ranges_started_ = false;
+  bool all_ranges_started_;
 
   /// The id of the callback added to the thread resource manager when thread token
   /// is available. Used to remove the callback before this scan node is destroyed.
   /// -1 if no callback is registered.
-  int thread_avail_cb_id_ = -1;
+  int thread_avail_cb_id_;
 
   /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
   /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
@@ -149,14 +147,6 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// the number of cores.
   int max_num_scanner_threads_;
 
-  /// Amount of the 'buffer_pool_client_' reservation that is not allocated to scanner
-  /// threads. Doled out to scanner threads when they are started and returned when
-  /// those threads no longer need it. Can be atomically incremented without holding
-  /// 'lock_' but 'lock_' is held when decrementing to ensure that the check for
-  /// reservation and the actual deduction are atomic with respect to other threads
-  /// trying to claim reservation.
-  AtomicInt64 spare_reservation_{0};
-
   /// The wait time for fetching a row batch from the row batch queue.
   RuntimeProfile::Counter* row_batches_get_timer_;
 
@@ -170,35 +160,21 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// Main function for scanner thread. This thread pulls the next range to be
   /// processed from the IoMgr and then processes the entire range end to end.
   /// This thread terminates when all scan ranges are complete or an error occurred.
-  /// The caller must have reserved 'scanner_thread_reservation' bytes of memory for
-  /// this thread with DeductReservationForScannerThread().
-  void ScannerThread(int64_t scanner_thread_reservation);
+  void ScannerThread();
 
   /// Process the entire scan range with a new scanner object. Executed in scanner
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
   /// in this split.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
-      MemPool* expr_results_pool, io::ScanRange* scan_range,
-      int64_t scanner_thread_reservation) WARN_UNUSED_RESULT;
-
-  /// Return true if there is enough reservation to start an extra scanner thread.
-  /// Tries to increase reservation if enough is not already available in
-  /// 'spare_reservation_'. 'lock_' must be held via 'lock'
-  bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& lock);
-
-  /// Deduct reservation to start a new scanner thread from 'spare_reservation_'. If
-  /// 'first_thread' is true, this is the first thread to be started and only the
-  /// minimum reservation is required to be available. Otherwise
-  /// EnoughReservationForExtra() thread must have returned true in the current
-  /// critical section so that 'ideal_scan_range_bytes_' is available for the extra
-  /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'.
-  int64_t DeductReservationForScannerThread(const boost::unique_lock<boost::mutex>& lock,
-      bool first_thread);
-
-  /// Called by scanner thread to return or all of its reservation that is not needed.
-  void ReturnReservationFromScannerThread(int64_t bytes) {
-    spare_reservation_.Add(bytes);
-  }
+      MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT;
+
+  /// Returns true if there is enough memory (against the mem tracker limits) to
+  /// have a scanner thread.
+  /// If new_thread is true, the calculation is for starting a new scanner thread.
+  /// If false, it determines whether there's adequate memory for the existing
+  /// set of scanner threads.
+  /// lock_ must be taken before calling this.
+  bool EnoughMemoryForScannerThread(bool new_thread);
 
   /// Checks for eos conditions and returns batches from materialized_row_batches_.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 2a6a912..253bcc8 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -144,10 +144,8 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         DCHECK(false);
     }
   }
-  if (compressed_text_scan_ranges.size() > 0) {
-    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
-        compressed_text_files));
-  }
+  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
+          compressed_text_files));
   if (lzo_text_files.size() > 0) {
     // This will dlopen the lzo binary and can fail if the lzo binary is not present.
     RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 5afb4e5..7cf89ba 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -30,7 +30,6 @@
 #include "exec/scanner-context.inline.h"
 #include "rpc/thrift-util.h"
 #include "runtime/collection-value-builder.h"
-#include "runtime/exec-env.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/runtime-state.h"
@@ -48,10 +47,6 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
     "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
     "be converted from UTC to local time. Writes are unaffected.");
 
-// Max dictionary page header size in bytes. This is an estimate and only needs to be an
-// upper bound.
-static const int MAX_DICT_HEADER_SIZE = 100;
-
 // Max data page header size in bytes. This is an estimate and only needs to be an upper
 // bound. It is theoretically possible to have a page header of any size due to string
 // value statistics, but in practice we'll have trouble reading string values this large.
@@ -71,8 +66,6 @@ static int debug_count = 0;
 #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
 #endif
 
-using namespace impala::io;
-
 namespace impala {
 
 const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
@@ -841,99 +834,8 @@ static bool RequiresSkippedDictionaryHeaderCheck(
   return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
 }
 
-Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
-    const parquet::ColumnChunk& col_chunk, int row_group_idx) {
-  num_buffered_values_ = 0;
-  data_ = nullptr;
-  data_end_ = nullptr;
-  stream_ = nullptr;
-  io_reservation_ = 0;
-  metadata_ = &col_chunk.meta_data;
-  num_values_read_ = 0;
-  def_level_ = -1;
-  // See ColumnReader constructor.
-  rep_level_ = max_rep_level() == 0 ? 0 : -1;
-  pos_current_value_ = -1;
-
-  if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
-    RETURN_IF_ERROR(Codec::CreateDecompressor(
-        nullptr, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
-  }
-  int64_t col_start = col_chunk.meta_data.data_page_offset;
-
-  RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
-      parent_->filename(), row_group_idx, col_idx(), schema_element(),
-      parent_->state_));
-
-  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
-    // Already validated in ValidateColumnOffsets()
-    DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
-    col_start = col_chunk.meta_data.dictionary_page_offset;
-  }
-  int64_t col_len = col_chunk.meta_data.total_compressed_size;
-  if (col_len <= 0) {
-    return Status(Substitute("File '$0' contains invalid column chunk size: $1",
-        filename(), col_len));
-  }
-  int64_t col_end = col_start + col_len;
-
-  // Already validated in ValidateColumnOffsets()
-  DCHECK_GT(col_end, 0);
-  DCHECK_LT(col_end, file_desc.file_length);
-  const ParquetFileVersion& file_version = parent_->file_version_;
-  if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
-    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
-    // dictionary page header size in total_compressed_size and total_uncompressed_size
-    // (see IMPALA-694). We pad col_len to compensate.
-    int64_t bytes_remaining = file_desc.file_length - col_end;
-    int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
-    col_len += pad;
-  }
-
-  // TODO: this will need to change when we have co-located files and the columns
-  // are different files.
-  if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
-    return Status(Substitute("Expected parquet column file path '$0' to match "
-        "filename '$1'", col_chunk.file_path, filename()));
-  }
-
-  const ScanRange* metadata_range = parent_->metadata_range_;
-  int64_t partition_id = parent_->context_->partition_descriptor()->id();
-  const ScanRange* split_range =
-      static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
-  // Determine if the column is completely contained within a local split.
-  bool col_range_local = split_range->expected_local()
-      && col_start >= split_range->offset()
-      && col_end <= split_range->offset() + split_range->len();
-  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
-      filename(), col_len, col_start, partition_id, split_range->disk_id(),
-      col_range_local,
-      BufferOpts(split_range->try_cache(), file_desc.mtime));
-  ClearDictionaryDecoder();
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::StartScan() {
-  DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
-  ScannerContext* context = parent_->context_;
-  DCHECK_GT(io_reservation_, 0);
-  bool needs_buffers;
-  RETURN_IF_ERROR(io_mgr->StartScanRange(
-      parent_->scan_node_->reader_context(), scan_range_, &needs_buffers));
-  if (needs_buffers) {
-    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-        parent_->scan_node_->reader_context(), context->bp_client(),
-        scan_range_, io_reservation_));
-  }
-  stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
-  DCHECK(stream_ != nullptr);
-  return Status::OK();
-}
-
 Status BaseScalarColumnReader::ReadPageHeader(bool peek,
     parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
-  DCHECK(stream_ != nullptr);
   *eos = false;
 
   uint8_t* buffer;
@@ -1017,7 +919,7 @@ Status BaseScalarColumnReader::InitDictionary() {
   bool eos;
   parquet::PageHeader next_page_header;
   uint32_t next_header_size;
-  DCHECK(stream_ != nullptr);
+
   DCHECK(!HasDictionaryDecoder());
 
   RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
@@ -1129,14 +1031,6 @@ Status BaseScalarColumnReader::InitDictionary() {
   return Status::OK();
 }
 
-Status BaseScalarColumnReader::InitDictionaries(
-    const vector<BaseScalarColumnReader*> readers) {
-  for (BaseScalarColumnReader* reader : readers) {
-    RETURN_IF_ERROR(reader->InitDictionary());
-  }
-  return Status::OK();
-}
-
 Status BaseScalarColumnReader::ReadDataPage() {
   // We're about to move to the next data page.  The previous data page is
   // now complete, free up any memory allocated for it. If the data page contained

http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index f19d40a..86ca239 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -322,29 +322,42 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : ParquetColumnReader(parent, node, slot_desc),
+      data_(NULL),
+      data_end_(NULL),
+      def_levels_(true),
+      rep_levels_(false),
+      page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
+      num_buffered_values_(0),
+      num_values_read_(0),
+      metadata_(NULL),
+      stream_(NULL),
       data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
     DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
   }
 
   virtual ~BaseScalarColumnReader() { }
 
-  /// Resets the reader for each row group in the file and creates the scan
-  /// range for the column, but does not start it. To start scanning,
-  /// set_io_reservation() must be called to assign reservation to this
-  /// column, followed by StartScan().
-  Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
-    int row_group_idx);
-
-  /// Starts the column scan range. The reader must be Reset() and have a
-  /// reservation assigned via set_io_reservation(). This must be called
-  /// before any of the column data can be read (including dictionary and
-  /// data pages). Returns an error status if there was an error starting the
-  /// scan or allocating buffers for it.
-  Status StartScan();
-
-  /// Helper to start scans for multiple columns at once.
-  static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
-    for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan());
+  /// This is called once for each row group in the file.
+  Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
+    DCHECK(stream != NULL);
+    DCHECK(metadata != NULL);
+
+    num_buffered_values_ = 0;
+    data_ = NULL;
+    data_end_ = NULL;
+    stream_ = stream;
+    metadata_ = metadata;
+    num_values_read_ = 0;
+    def_level_ = -1;
+    // See ColumnReader constructor.
+    rep_level_ = max_rep_level() == 0 ? 0 : -1;
+    pos_current_value_ = -1;
+
+    if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+      RETURN_IF_ERROR(Codec::CreateDecompressor(
+          NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
+    }
+    ClearDictionaryDecoder();
     return Status::OK();
   }
 
@@ -359,27 +372,22 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     if (dict_decoder != nullptr) dict_decoder->Close();
   }
 
-  io::ScanRange* scan_range() const { return scan_range_; }
   int64_t total_len() const { return metadata_->total_compressed_size; }
   int col_idx() const { return node_.col_idx; }
   THdfsCompression::type codec() const {
     if (metadata_ == NULL) return THdfsCompression::NONE;
     return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
   }
-  void set_io_reservation(int bytes) { io_reservation_ = bytes; }
 
   /// Reads the next definition and repetition levels for this column. Initializes the
   /// next data page if necessary.
   virtual bool NextLevels() { return NextLevels<true>(); }
 
-  /// Check the data stream to see if there is a dictionary page. If there is,
-  /// use that page to initialize dict_decoder_ and advance the data stream
-  /// past the dictionary page.
+  // Check the data stream to see if there is a dictionary page. If there is,
+  // use that page to initialize dict_decoder_ and advance the data stream
+  // past the dictionary page.
   Status InitDictionary();
 
-  /// Convenience function to initialize multiple dictionaries.
-  static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers);
-
   // Returns the dictionary or NULL if the dictionary doesn't exist
   virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
 
@@ -405,45 +413,33 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   // fit in as few cache lines as possible.
 
   /// Pointer to start of next value in data page
-  uint8_t* data_ = nullptr;
+  uint8_t* data_;
 
   /// End of the data page.
-  const uint8_t* data_end_ = nullptr;
+  const uint8_t* data_end_;
 
   /// Decoder for definition levels.
-  ParquetLevelDecoder def_levels_{true};
+  ParquetLevelDecoder def_levels_;
 
   /// Decoder for repetition levels.
-  ParquetLevelDecoder rep_levels_{false};
+  ParquetLevelDecoder rep_levels_;
 
   /// Page encoding for values of the current data page. Cached here for perf. Set in
   /// InitDataPage().
-  parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
+  parquet::Encoding::type page_encoding_;
 
   /// Num values remaining in the current data page
-  int num_buffered_values_ = 0;
+  int num_buffered_values_;
 
   // Less frequently used members that are not accessed in inner loop should go below
   // here so they do not occupy precious cache line space.
 
   /// The number of values seen so far. Updated per data page.
-  int64_t num_values_read_ = 0;
-
-  /// Metadata for the column for the current row group.
-  const parquet::ColumnMetaData* metadata_ = nullptr;
+  int64_t num_values_read_;
 
+  const parquet::ColumnMetaData* metadata_;
   boost::scoped_ptr<Codec> decompressor_;
-
-  /// The scan range for the column's data. Initialized for each row group by Reset().
-  io::ScanRange* scan_range_ = nullptr;
-
-  // Stream used to read data from 'scan_range_'. Initialized by StartScan().
-  ScannerContext::Stream* stream_ = nullptr;
-
-  /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
-  /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
-  /// by Reset().
-  int64_t io_reservation_ = 0;
+  ScannerContext::Stream* stream_;
 
   /// Pool to allocate storage for data pages from - either decompression buffers for
   /// compressed data pages or copies of the data page with var-len data to attach to