You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/02/10 10:40:39 UTC

[impala] branch master updated: IMPALA-6636: Use async IO in ORC scanner

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 97dda2b  IMPALA-6636: Use async IO in ORC scanner
97dda2b is described below

commit 97dda2b27da99367f4d07699aa046b16cda16dd4
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu Mar 5 14:46:03 2020 +0100

    IMPALA-6636: Use async IO in ORC scanner
    
    This patch implements async IO in the ORC scanner. For each ORC stripe,
    we begin with iterating the column streams. If a column stream is
    possible for async IO, it will create ColumnRange, register
    ScannerContext::Stream for that ORC stream, and start the stream. We
    modify HdfsOrcScanner::ScanRangeInputStream::read to check whether there
    is a matching ColumnRange for the given offset and length. If so, the
    reading continue through HdfsOrcScanner::ColumnRange::read.
    
    We leverage existing async IO methods from HdfsParquetScanner class for
    initial memory allocations. We moved related methods such as
    DivideReservationBetweenColumns and ComputeIdealReservation up to
    HdfsColumnarScanner class.
    
    Planner calculates the memory reservation differently between async
    Parquet and async ORC. In async Parquet, the planner calculates the
    column memory reservation and relies on the backend to divide them as
    needed. In async ORC, the planner needs to split the column's memory
    reservation based on the estimated number of streams for that column
    type. For example, a string column with a 4MB memory estimate will need
    to split that estimate into four 1MB because it might use dictionary
    encoding with four streams (PRESENT, DATA, DICTIONARY_DATA, and LENGTH
    stream). This splitting is required because each async IO stream needs
    to start with an 8KB (min_buffer_size) initial memory reservation.
    
    To show the improvement from ORC async IO, we contrast the total time
    and geomean (in milliseconds) to run full TPC-DS 10 TB, 19 executors,
    with varying ORC_ASYNC_IO and DISABLE_DATA_CACHE options as follow:
    
    +----------------------+------------------+------------------+
    | Total time           | ORC_ASYNC_READ=0 | ORC_ASYNC_READ=1 |
    +----------------------+------------------+------------------+
    | DISABLE_DATA_CACHE=0 |          3511075 |          3484736 |
    | DISABLE_DATA_CACHE=1 |          5243337 |          4370095 |
    +----------------------+------------------+------------------+
    
    +----------------------+------------------+------------------+
    | Geomean              | ORC_ASYNC_READ=0 | ORC_ASYNC_READ=1 |
    +----------------------+------------------+------------------+
    | DISABLE_DATA_CACHE=0 |      12786.58042 |      12454.80365 |
    | DISABLE_DATA_CACHE=1 |      23081.10888 |      16692.31512 |
    +----------------------+------------------+------------------+
    
    Testing:
    - Pass core tests.
    - Pass core e2e tests with ORC_ASYNC_READ=1.
    
    Change-Id: I348ad9e55f0cae7dff0d74d941b026dcbf5e4074
    Reviewed-on: http://gerrit.cloudera.org:8080/15370
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-columnar-scanner.cc               | 194 +++++++++++++++++
 be/src/exec/hdfs-columnar-scanner.h                |  71 ++++++
 be/src/exec/hdfs-orc-scanner.cc                    | 238 ++++++++++++++++++---
 be/src/exec/hdfs-orc-scanner.h                     |  85 ++++++--
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 147 +------------
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  41 ----
 be/src/exec/parquet/parquet-page-index.cc          |   1 +
 be/src/exec/parquet/parquet-page-reader.cc         |  19 +-
 be/src/exec/scanner-context.cc                     |  14 ++
 be/src/exec/scanner-context.h                      |   5 +
 be/src/runtime/io/disk-io-mgr.h                    |   2 +-
 be/src/runtime/io/request-ranges.h                 |   5 +
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   3 +-
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   3 +
 .../org/apache/impala/planner/HdfsScanNode.java    | 138 +++++++++---
 .../queries/PlannerTest/resource-requirements.test |  18 +-
 .../queries/QueryTest/scanner-reservation.test     |  10 +-
 19 files changed, 721 insertions(+), 280 deletions(-)

diff --git a/be/src/exec/hdfs-columnar-scanner.cc b/be/src/exec/hdfs-columnar-scanner.cc
index 708c56a..9131429 100644
--- a/be/src/exec/hdfs-columnar-scanner.cc
+++ b/be/src/exec/hdfs-columnar-scanner.cc
@@ -18,16 +18,53 @@
 #include "exec/hdfs-columnar-scanner.h"
 
 #include <algorithm>
+#include <gutil/strings/substitute.h>
 
 #include "codegen/llvm-codegen.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/scratch-tuple-batch.h"
+#include "runtime/exec-env.h"
 #include "runtime/fragment-state.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+
+using namespace std;
+using namespace strings;
 
 namespace impala {
 
+PROFILE_DEFINE_COUNTER(
+    NumColumns, STABLE_LOW, TUnit::UNIT, "Number of columns that need to be read.");
+PROFILE_DEFINE_COUNTER(NumScannersWithNoReads, STABLE_LOW, TUnit::UNIT,
+    "Number of scanners that end up doing no reads because their splits don't overlap "
+    "with the midpoint of any row-group/stripe in the file.");
+PROFILE_DEFINE_SUMMARY_STATS_TIMER(FooterProcessingTime, STABLE_LOW,
+    "Average and min/max time spent processing the footer by each split.");
+PROFILE_DEFINE_SUMMARY_STATS_COUNTER(ColumnarScannerIdealReservation, DEBUG, TUnit::BYTES,
+    "Tracks stats about the ideal reservation for a scanning a row group (parquet) or "
+    "stripe (orc). The ideal reservation is calculated based on min and max buffer "
+    "size.");
+PROFILE_DEFINE_SUMMARY_STATS_COUNTER(ColumnarScannerActualReservation, DEBUG,
+    TUnit::BYTES,
+    "Tracks stats about the actual reservation for a scanning a row group "
+    "(parquet) or stripe (orc).");
+PROFILE_DEFINE_COUNTER(IoReadSyncRequest, DEBUG, TUnit::UNIT,
+    "Number of stream read request done in synchronized manner.");
+PROFILE_DEFINE_COUNTER(IoReadAsyncRequest, DEBUG, TUnit::UNIT,
+    "Number of stream read request done in asynchronized manner.");
+PROFILE_DEFINE_COUNTER(
+    IoReadTotalRequest, DEBUG, TUnit::UNIT, "Total number of stream read request.");
+PROFILE_DEFINE_COUNTER(IoReadSyncBytes, DEBUG, TUnit::BYTES,
+    "The total number of bytes read from streams in synchronized manner.");
+PROFILE_DEFINE_COUNTER(IoReadAsyncBytes, DEBUG, TUnit::BYTES,
+    "The total number of bytes read from streams in asynchronized manner.");
+PROFILE_DEFINE_COUNTER(IoReadTotalBytes, DEBUG, TUnit::BYTES,
+    "The total number of bytes read from streams.");
+PROFILE_DEFINE_COUNTER(IoReadSkippedBytes, DEBUG, TUnit::BYTES,
+    "The total number of bytes skipped from streams.");
+
 const char* HdfsColumnarScanner::LLVM_CLASS_NAME = "class.impala::HdfsColumnarScanner";
 
 HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node,
@@ -39,6 +76,28 @@ HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node,
 
 HdfsColumnarScanner::~HdfsColumnarScanner() {}
 
+Status HdfsColumnarScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
+  RuntimeProfile* profile = scan_node_->runtime_profile();
+  num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile);
+  num_scanners_with_no_reads_counter_ =
+      PROFILE_NumScannersWithNoReads.Instantiate(profile);
+  process_footer_timer_stats_ = PROFILE_FooterProcessingTime.Instantiate(profile);
+  columnar_scanner_ideal_reservation_counter_ =
+      PROFILE_ColumnarScannerIdealReservation.Instantiate(profile);
+  columnar_scanner_actual_reservation_counter_ =
+      PROFILE_ColumnarScannerActualReservation.Instantiate(profile);
+
+  io_sync_request_ = PROFILE_IoReadSyncRequest.Instantiate(profile);
+  io_sync_bytes_ = PROFILE_IoReadSyncBytes.Instantiate(profile);
+  io_async_request_ = PROFILE_IoReadAsyncRequest.Instantiate(profile);
+  io_async_bytes_ = PROFILE_IoReadAsyncBytes.Instantiate(profile);
+  io_total_request_ = PROFILE_IoReadTotalRequest.Instantiate(profile);
+  io_total_bytes_ = PROFILE_IoReadTotalBytes.Instantiate(profile);
+  io_skipped_bytes_ = PROFILE_IoReadSkippedBytes.Instantiate(profile);
+  return Status::OK();
+}
+
 int HdfsColumnarScanner::FilterScratchBatch(RowBatch* dst_batch) {
   // This function must not be called when the output batch is already full. As long as
   // we always call CommitRows() after TransferScratchTuples(), the output batch can
@@ -113,4 +172,139 @@ int HdfsColumnarScanner::ProcessScratchBatchCodegenOrInterpret(RowBatch* dst_bat
       dst_batch);
 }
 
+HdfsColumnarScanner::ColumnReservations
+HdfsColumnarScanner::DivideReservationBetweenColumnsHelper(int64_t min_buffer_size,
+    int64_t max_buffer_size, const ColumnRangeLengths& col_range_lengths,
+    int64_t reservation_to_distribute) {
+  // Pair of (column index, reservation allocated).
+  ColumnReservations tmp_reservations;
+  for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0);
+
+  // Sort in descending order of length, breaking ties by index so that larger columns
+  // get allocated reservation first. It is common to have dramatically different column
+  // sizes in a single file because of different value sizes and compressibility. E.g.
+  // consider a large STRING "comment" field versus a highly compressible
+  // dictionary-encoded column with only a few distinct values. We want to give max-sized
+  // buffers to large columns first to maximize the size of I/Os that we do while reading
+  // this row group.
+  sort(tmp_reservations.begin(), tmp_reservations.end(),
+      [&col_range_lengths](
+          const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
+        int64_t left_len = col_range_lengths[left.first];
+        int64_t right_len = col_range_lengths[right.first];
+        return (left_len != right_len) ? (left_len > right_len) :
+                                         (left.first < right.first);
+      });
+
+  // Set aside the minimum reservation per column.
+  reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
+
+  // Allocate reservations to columns by repeatedly allocating either a max-sized buffer
+  // or a large enough buffer to fit the remaining data for each column. Do this
+  // round-robin up to the ideal number of I/O buffers.
+  for (int i = 0; i < io::DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
+    for (auto& tmp_reservation : tmp_reservations) {
+      // Add back the reservation we set aside above.
+      if (i == 0) reservation_to_distribute += min_buffer_size;
+
+      int64_t bytes_left_in_range =
+          col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
+      int64_t bytes_to_add;
+      if (bytes_left_in_range >= max_buffer_size) {
+        if (reservation_to_distribute >= max_buffer_size) {
+          bytes_to_add = max_buffer_size;
+        } else if (i == 0) {
+          DCHECK_EQ(0, tmp_reservation.second);
+          // Ensure this range gets at least one buffer on the first iteration.
+          bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
+        } else {
+          DCHECK_GT(tmp_reservation.second, 0);
+          // We need to read more than the max buffer size, but can't allocate a
+          // max-sized buffer. Stop adding buffers to this column: we prefer to use
+          // the existing max-sized buffers without small buffers mixed in so that
+          // we will alway do max-sized I/Os, which make efficient use of I/O devices.
+          bytes_to_add = 0;
+        }
+      } else if (bytes_left_in_range > 0
+          && reservation_to_distribute >= min_buffer_size) {
+        // Choose a buffer size that will fit the rest of the bytes left in the range.
+        bytes_to_add =
+            max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
+        // But don't add more reservation than is available.
+        bytes_to_add =
+            min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
+      } else {
+        bytes_to_add = 0;
+      }
+      DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add;
+      reservation_to_distribute -= bytes_to_add;
+      tmp_reservation.second += bytes_to_add;
+
+      DCHECK_GE(reservation_to_distribute, 0);
+      DCHECK_GT(tmp_reservation.second, 0);
+    }
+  }
+  return tmp_reservations;
+}
+
+int64_t HdfsColumnarScanner::ComputeIdealReservation(
+    const ColumnRangeLengths& col_range_lengths) {
+  io::DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  int64_t ideal_reservation = 0;
+  for (int64_t len : col_range_lengths) {
+    ideal_reservation += io_mgr->ComputeIdealBufferReservation(len);
+  }
+  return ideal_reservation;
+}
+
+Status HdfsColumnarScanner::DivideReservationBetweenColumns(
+    const ColumnRangeLengths& col_range_lengths,
+    ColumnReservations& reservation_per_column) {
+  io::DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  const int64_t min_buffer_size = io_mgr->min_buffer_size();
+  const int64_t max_buffer_size = io_mgr->max_buffer_size();
+  // The HdfsScanNode reservation calculation in the planner ensures that we have
+  // reservation for at least one buffer per column.
+  if (context_->total_reservation() < min_buffer_size * col_range_lengths.size()) {
+    return Status(TErrorCode::INTERNAL_ERROR,
+        Substitute("Not enough reservation in columnar scanner for file '$0'. "
+                   "Need at least $1 bytes per column for $2 columns but had $3 bytes",
+            filename(), min_buffer_size, col_range_lengths.size(),
+            context_->total_reservation()));
+  }
+
+  // The scanner-wide stream was used only to read the file footer.  Each column has added
+  // its own stream. We can use the total reservation now that 'stream_''s resources have
+  // been released. We may benefit from increasing reservation further, so let's compute
+  // the ideal reservation to scan all the columns.
+  int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths);
+  if (ideal_reservation > context_->total_reservation()) {
+    context_->TryIncreaseReservation(ideal_reservation);
+  }
+  columnar_scanner_actual_reservation_counter_->UpdateCounter(
+      context_->total_reservation());
+  columnar_scanner_ideal_reservation_counter_->UpdateCounter(ideal_reservation);
+
+  reservation_per_column = DivideReservationBetweenColumnsHelper(
+      min_buffer_size, max_buffer_size, col_range_lengths, context_->total_reservation());
+  return Status::OK();
+}
+
+void HdfsColumnarScanner::AddSyncReadBytesCounter(int64_t total_bytes) {
+  io_sync_request_->Add(1);
+  io_total_request_->Add(1);
+  io_sync_bytes_->Add(total_bytes);
+  io_total_bytes_->Add(total_bytes);
+}
+
+void HdfsColumnarScanner::AddAsyncReadBytesCounter(int64_t total_bytes) {
+  io_async_request_->Add(1);
+  io_total_request_->Add(1);
+  io_async_bytes_->Add(total_bytes);
+  io_total_bytes_->Add(total_bytes);
+}
+
+void HdfsColumnarScanner::AddSkippedReadBytesCounter(int64_t total_bytes) {
+  io_skipped_bytes_->Add(total_bytes);
+}
 }
diff --git a/be/src/exec/hdfs-columnar-scanner.h b/be/src/exec/hdfs-columnar-scanner.h
index 56805e4..472863b 100644
--- a/be/src/exec/hdfs-columnar-scanner.h
+++ b/be/src/exec/hdfs-columnar-scanner.h
@@ -36,11 +36,22 @@ class HdfsColumnarScanner : public HdfsScanner {
   HdfsColumnarScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
   virtual ~HdfsColumnarScanner();
 
+  virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
+
   /// Codegen ProcessScratchBatch(). Stores the resulting function in
   /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
   static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
       llvm::Function** process_scratch_batch_fn);
 
+  /// Add sync read related counters.
+  void AddSyncReadBytesCounter(int64_t total_bytes);
+
+  /// Add async read related counters.
+  void AddAsyncReadBytesCounter(int64_t total_bytes);
+
+  /// Add skipped bytes related counters.
+  void AddSkippedReadBytesCounter(int64_t total_bytes);
+
   /// Class name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
 
@@ -49,6 +60,9 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// top-level tuples. See AssembleRows() in the derived classes.
   boost::scoped_ptr<ScratchTupleBatch> scratch_batch_;
 
+  /// Scan range for the metadata.
+  const io::ScanRange* metadata_range_ = nullptr;
+
   typedef int (*ProcessScratchBatchFn)(HdfsColumnarScanner*, RowBatch*);
   /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
   /// Function type: ProcessScratchBatchFn
@@ -59,6 +73,9 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// Returns the number of tuples that should be committed to the given batch.
   int FilterScratchBatch(RowBatch* row_batch);
 
+  /// Get filename of the scan range.
+  const char* filename() const { return metadata_range_->file(); }
+
   /// Evaluates runtime filters and conjuncts (if any) against the tuples in
   /// 'scratch_batch_', and adds the surviving tuples to the given batch.
   /// Transfers the ownership of tuple memory to the target batch when the
@@ -71,6 +88,60 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// materialized tuples. This is a separate function so it can be codegened.
   int ProcessScratchBatch(RowBatch* dst_batch);
 
+  /// List of pair of (column index, reservation allocated).
+  typedef std::vector<std::pair<int, int64_t>> ColumnReservations;
+  /// List of column range lengths.
+  typedef std::vector<int64_t> ColumnRangeLengths;
+
+  /// Decides how to divide stream_->reservation() between the columns. May increase
+  /// the reservation if more reservation would enable more efficient I/O for the
+  /// current columns being scanned. Sets the reservation on each corresponding reader
+  /// in 'column_readers'.
+  Status DivideReservationBetweenColumns(const ColumnRangeLengths& col_range_lengths,
+      ColumnReservations& reservation_per_column);
+
+  /// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for
+  /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
+  /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
+  /// a vector with an entry per column with the index into 'col_range_lengths' and the
+  /// amount of reservation in bytes to give to that column.
+  static ColumnReservations DivideReservationBetweenColumnsHelper(int64_t min_buffer_size,
+      int64_t max_buffer_size, const ColumnRangeLengths& col_range_lengths,
+      int64_t reservation_to_distribute);
+
+  /// Compute the ideal reservation to scan a file with scan range lengths
+  /// 'col_range_lengths' given the min and max buffer size of the singleton DiskIoMgr
+  /// in ExecEnv.
+  static int64_t ComputeIdealReservation(const ColumnRangeLengths& col_range_lengths);
+
+  /// Number of columns that need to be read.
+  RuntimeProfile::Counter* num_cols_counter_;
+
+  /// Number of scanners that end up doing no reads because their splits don't overlap
+  /// with the midpoint of any row-group/stripe in the file.
+  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
+
+  /// Average and min/max time spent processing the footer by each split.
+  RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_;
+
+  /// Average and min/max memory reservation for a scanning a row group (parquet) or
+  /// stripe (orc), both ideal (calculated based on min and max buffer size) and actual.
+  RuntimeProfile::SummaryStatsCounter* columnar_scanner_ideal_reservation_counter_;
+  RuntimeProfile::SummaryStatsCounter* columnar_scanner_actual_reservation_counter_;
+
+  /// Number of stream read request done in in sync and async manners and the total.
+  RuntimeProfile::Counter* io_sync_request_;
+  RuntimeProfile::Counter* io_async_request_;
+  RuntimeProfile::Counter* io_total_request_;
+
+  /// Number of stream read bytes done in in sync and async manners and the total.
+  RuntimeProfile::Counter* io_sync_bytes_;
+  RuntimeProfile::Counter* io_async_bytes_;
+  RuntimeProfile::Counter* io_total_bytes_;
+
+  /// Total number of bytes skipped during stream reading.
+  RuntimeProfile::Counter* io_skipped_bytes_;
+
  private:
   int ProcessScratchBatchCodegenOrInterpret(RowBatch* dst_batch);
 };
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 3d2a49c..83f120c 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -18,11 +18,11 @@
 #include "exec/hdfs-orc-scanner.h"
 
 #include <queue>
+#include <set>
 
 #include "exec/exec-node.inline.h"
 #include "exec/orc-column-readers.h"
 #include "exec/scanner-context.inline.h"
-#include "exec/scratch-tuple-batch.h"
 #include "exprs/expr.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/collection-value-builder.h"
@@ -39,6 +39,8 @@
 using namespace impala;
 using namespace impala::io;
 
+namespace impala {
+
 Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   DCHECK(!files.empty());
@@ -52,8 +54,6 @@ Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   return IssueFooterRanges(scan_node, THdfsFileFormat::ORC, files);
 }
 
-namespace impala {
-
 HdfsOrcScanner::OrcMemPool::OrcMemPool(HdfsOrcScanner* scanner)
     : scanner_(scanner), mem_tracker_(scanner_->scan_node_->mem_tracker()) {
 }
@@ -101,22 +101,48 @@ void HdfsOrcScanner::OrcMemPool::free(char* p) {
   chunk_sizes_.erase(p);
 }
 
-// TODO: improve this to use async IO (IMPALA-6636).
 void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
     uint64_t offset) {
+  Status status;
+  if (scanner_->IsInFooterRange(offset, length)) {
+    status = scanner_->ReadFooterStream(buf, length, offset);
+  } else {
+    ColumnRange* columnRange = scanner_->FindColumnRange(length, offset);
+    if (columnRange == nullptr) {
+      status = readRandom(buf, length, offset);
+    } else if (offset < columnRange->current_position_) {
+      VLOG_QUERY << Substitute(
+          "ORC read request to already read range. Falling back to readRandom. "
+          "offset: $0 length: $1 $2",
+          offset, length, columnRange->debug());
+      status = readRandom(buf, length, offset);
+    } else {
+      status = columnRange->read(buf, length, offset);
+    }
+  }
+  if (!status.ok()) throw ResourceError(status);
+}
+
+Status HdfsOrcScanner::ScanRangeInputStream::readRandom(
+    void* buf, uint64_t length, uint64_t offset) {
+  if (offset + length > getLength()) {
+    string msg = Substitute("Invalid read offset/length on ORC file $0. offset: $1 "
+                            "length: $2 file_length: $3.",
+        filename_, offset, length, getLength());
+    return Status(msg);
+  }
+
   const ScanRange* metadata_range = scanner_->metadata_range_;
   const ScanRange* split_range =
       reinterpret_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
   int64_t partition_id = scanner_->context_->partition_descriptor()->id();
 
-  // Set expected_local to false to avoid cache on stale data (IMPALA-6830)
-  bool expected_local = false;
+  bool expected_local = split_range->ExpectedLocalRead(offset, length);
   int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
   ScanRange* range = scanner_->scan_node_->AllocateScanRange(
       metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
       split_range->disk_id(), expected_local, split_range->mtime(),
       BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options));
-
   unique_ptr<BufferDescriptor> io_buffer;
   Status status;
   {
@@ -128,8 +154,144 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
     DCHECK(!status.ok() || !needs_buffers) << "Already provided a buffer";
     if (status.ok()) status = range->GetNext(&io_buffer);
   }
-  if (io_buffer != nullptr) range->ReturnBuffer(move(io_buffer));
-  if (!status.ok()) throw ResourceError(status);
+  if (io_buffer != nullptr) {
+    DCHECK_EQ(io_buffer->len(), length);
+    scanner_->AddSyncReadBytesCounter(length);
+    range->ReturnBuffer(move(io_buffer));
+  }
+  return status;
+}
+
+bool useAsyncIoForStream(orc::StreamKind kind) {
+  switch (kind) {
+    case orc::StreamKind_DATA:
+    case orc::StreamKind_LENGTH:
+    case orc::StreamKind_SECONDARY:
+    case orc::StreamKind_DICTIONARY_DATA:
+    case orc::StreamKind_DICTIONARY_COUNT:
+    case orc::StreamKind_PRESENT:
+      return true;
+      // We skip Async IO for the following stream kind. We expect that these streams will
+      // be read in one batch, so async reading does not help much.
+      // They also not too large.
+    case orc::StreamKind_ROW_INDEX:
+    case orc::StreamKind_BLOOM_FILTER:
+    case orc::StreamKind_BLOOM_FILTER_UTF8:
+      return false;
+    default:
+      DCHECK(false);
+  }
+}
+
+Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe) {
+  columnRanges_.clear();
+
+  const std::list<uint64_t>& selected_type_ids = selected_type_ids_;
+  // Collect the stream belonging to selected columns.
+  set<uint64_t> column_id_set(selected_type_ids.begin(), selected_type_ids.end());
+  try {
+    uint64_t stream_count = stripe.getNumberOfStreams();
+    for (uint64_t stream_id = 0; stream_id < stream_count; stream_id++) {
+      unique_ptr<orc::StreamInformation> stream = stripe.getStreamInformation(stream_id);
+      if (column_id_set.find(stream->getColumnId()) == column_id_set.end()) continue;
+      if (!useAsyncIoForStream(stream->getKind())) continue;
+      if (stream->getLength() == 0) continue;
+
+      columnRanges_.emplace_back(stream->getLength(), stream->getOffset(),
+          stream->getKind(), stream->getColumnId(), this);
+    }
+  } catch (ResourceError& e) { // errors throw from the orc scanner
+    parse_status_ = e.GetStatus();
+    return parse_status_;
+  } catch (std::exception& e) { // other errors throw from the orc library
+    string msg = Substitute(
+        "Encountered parse error in tail of ORC file $0: $1", filename(), e.what());
+    parse_status_ = Status(msg);
+    return parse_status_;
+  }
+
+  // Sort and check that there is no overlapping range in columnRanges_.
+  sort(columnRanges_.begin(), columnRanges_.end());
+  uint64_t last_end = 0;
+  for (const ColumnRange& range : columnRanges_) {
+    if (last_end > range.offset_) {
+      string msg =
+          Substitute("Overlapping ORC column ranges. Last end: $0 Current offset: $1",
+              last_end, range.offset_);
+      return Status(msg);
+    }
+    last_end = range.offset_ + range.length_;
+  }
+
+  // Divide reservation between columns.
+  ColumnRangeLengths col_range_lengths(columnRanges_.size());
+  for (int i = 0; i < columnRanges_.size(); ++i) {
+    col_range_lengths[i] = columnRanges_[i].length_;
+  }
+  ColumnReservations tmp_reservations;
+  RETURN_IF_ERROR(DivideReservationBetweenColumns(col_range_lengths, tmp_reservations));
+  for (auto& tmp_reservation : tmp_reservations) {
+    columnRanges_[tmp_reservation.first].io_reservation = tmp_reservation.second;
+  }
+
+  int64_t partition_id = context_->partition_descriptor()->id();
+  const ScanRange* split_range =
+      static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
+  for (ColumnRange& range : columnRanges_) {
+    // Determine if the column is completely contained within a local split.
+    bool col_range_local = split_range->ExpectedLocalRead(range.offset_, range.length_);
+
+    int file_length = scan_node_->GetFileDesc(partition_id, filename())->file_length;
+    if (range.offset_ + range.length_ > file_length) {
+      string msg = Substitute("Invalid read len.");
+      return Status(msg);
+    }
+    ScanRange* scan_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
+        filename(), range.length_, range.offset_, partition_id, split_range->disk_id(),
+        col_range_local, split_range->mtime(), BufferOpts(split_range->cache_options()));
+    RETURN_IF_ERROR(
+        context_->AddAndStartStream(scan_range, range.io_reservation, &range.stream_));
+  }
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::ColumnRange::read(void* buf, uint64_t length, uint64_t offset) {
+  if (offset + length > offset_ + length_) {
+    string msg = Substitute("ORC read request out of range. offset: $0 length: $1 $2",
+        offset, length, debug());
+    return Status(msg);
+  }
+
+  DCHECK(offset >= current_position_);
+  Status status;
+  if (offset > current_position_) {
+    // skip the non-requested range
+    uint64_t bytes_to_skip = offset - current_position_;
+    if (!stream_->SkipBytes(bytes_to_skip, &status)) {
+      LOG(ERROR) << Substitute(
+          "HdfsOrcScanner::ColumnRange::read skipping failed. offset: $0 length: $1 $2",
+          offset, length, debug());
+      return status;
+    }
+    scanner_->AddSkippedReadBytesCounter(bytes_to_skip);
+    current_position_ = offset;
+  }
+
+  uint8_t* stream_buf = nullptr;
+  {
+    SCOPED_TIMER2(scanner_->state_->total_storage_wait_timer(),
+        scanner_->scan_node_->scanner_io_wait_time());
+    if (!stream_->ReadBytes(length, &stream_buf, &status, false)) return status;
+    scanner_->AddAsyncReadBytesCounter(length);
+  }
+  CHECK_NOTNULL(stream_buf);
+  memcpy(buf, stream_buf, length); // TODO: ORC-262: extend ORC interface to avoid copy.
+  current_position_ += length;
+  bool done = current_position_ == offset_ + length_;
+  DCHECK_EQ(current_position_, stream_->file_offset());
+  DCHECK_EQ(done, stream_->bytes_left() == 0);
+  stream_->ReleaseCompletedResources(done);
+  return Status::OK();
 }
 
 HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
@@ -145,16 +307,10 @@ HdfsOrcScanner::~HdfsOrcScanner() {
 }
 
 Status HdfsOrcScanner::Open(ScannerContext* context) {
-  RETURN_IF_ERROR(HdfsScanner::Open(context));
+  RETURN_IF_ERROR(HdfsColumnarScanner::Open(context));
   metadata_range_ = stream_->scan_range();
-  num_cols_counter_ =
-      ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcColumns", TUnit::UNIT);
   num_stripes_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcStripes", TUnit::UNIT);
-  num_scanners_with_no_reads_counter_ =
-      ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
-  process_footer_timer_stats_ =
-      ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "OrcFooterProcessingTime");
 
   codegend_process_scratch_batch_fn_ = scan_node_->GetCodegenFn(THdfsFileFormat::ORC);
   if (codegend_process_scratch_batch_fn_ == nullptr) {
@@ -270,7 +426,6 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
     parse_status_ = Status(msg);
     return parse_status_;
   }
-
   // Set top-level template tuple.
   template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
   return Status::OK();
@@ -325,10 +480,13 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
 }
 
 Status HdfsOrcScanner::ProcessFileTail() {
-  unique_ptr<orc::InputStream> input_stream(new ScanRangeInputStream(this));
-  VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
-      << ", length: " << input_stream->getLength();
   try {
+    // ScanRangeInputStream keeps a pointer to this HdfsOrcScanner so we can hack
+    // async IO behind the orc::InputStream interface. The ranges of the
+    // selected columns will be updated when starting new stripes.
+    unique_ptr<orc::InputStream> input_stream(new ScanRangeInputStream(this));
+    VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
+              << ", file_length: " << input_stream->getLength();
     reader_ = orc::createReader(move(input_stream), reader_options_);
   } catch (ResourceError& e) {  // errors throw from the orc scanner
     parse_status_ = e.GetStatus();
@@ -745,6 +903,9 @@ Status HdfsOrcScanner::NextStripe() {
     }
 
     COUNTER_ADD(num_stripes_counter_, 1);
+    if (state_->query_options().orc_async_read) {
+      RETURN_IF_ERROR(StartColumnReading(*stripe.get()));
+    }
     row_reader_options_.range(stripe->getOffset(), stripe_len);
     try {
       row_reader_ = reader_->createRowReader(row_reader_options_);
@@ -752,9 +913,9 @@ Status HdfsOrcScanner::NextStripe() {
       parse_status_ = e.GetStatus();
       return parse_status_;
     } catch (std::exception& e) { // errors throw from the orc library
-      VLOG_QUERY << "Error in creating ORC column readers: " << e.what();
-      parse_status_ = Status(
-          Substitute("Error in creating ORC column readers: $0.", e.what()));
+      parse_status_ = Status(Substitute(
+          "Error in creating column readers for ORC file $0: $1.", filename(), e.what()));
+      VLOG_QUERY << parse_status_.msg().msg();
       return parse_status_;
     }
     end_of_stripe_ = false;
@@ -794,8 +955,9 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
         parse_status_ = e.GetStatus();
         return parse_status_;
       } catch (std::exception& e) {
-        VLOG_QUERY << "Encounter parse error: " << e.what();
-        parse_status_ = Status(Substitute("Encounter parse error: $0.", e.what()));
+        parse_status_ = Status(Substitute(
+            "Encounter parse error in ORC file $0: $1.", filename(), e.what()));
+        VLOG_QUERY << parse_status_.msg().msg();
         eos_ = true;
         return parse_status_;
       }
@@ -1201,4 +1363,30 @@ Status HdfsOrcScanner::PrepareSearchArguments() {
   return Status::OK();
 }
 
+Status HdfsOrcScanner::ReadFooterStream(void* buf, uint64_t length, uint64_t offset) {
+  Status status;
+  if (offset > stream_->file_offset()) {
+    // skip the non-requested range
+    uint64_t bytes_to_skip = offset - stream_->file_offset();
+    if (!stream_->SkipBytes(bytes_to_skip, &status)) {
+      LOG(ERROR) << Substitute("HdfsOrcScanner::ReadFooterStream skipping failed. "
+                               "offset: $0 length: $1 current_offset: $2",
+          offset, length, stream_->file_offset());
+      return status;
+    }
+    AddSkippedReadBytesCounter(bytes_to_skip);
+  }
+
+  uint8_t* stream_buf = nullptr;
+  {
+    SCOPED_TIMER2(state_->total_storage_wait_timer(), scan_node_->scanner_io_wait_time());
+    if (!stream_->ReadBytes(length, &stream_buf, &status, false)) return status;
+    AddAsyncReadBytesCounter(length);
+  }
+  CHECK_NOTNULL(stream_buf);
+  memcpy(buf, stream_buf, length); // TODO: ORC-262: extend ORC interface to avoid copy.
+  bool done = stream_->bytes_left() == 0;
+  stream_->ReleaseCompletedResources(done);
+  return Status::OK();
+}
 }
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 0e7c0cb..67159ee 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -91,6 +91,39 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
     boost::unordered_map<char*, uint64_t> chunk_sizes_;
   };
 
+  struct ColumnRange {
+    uint64_t offset_;
+    uint64_t length_;
+    uint64_t current_position_;
+    orc::StreamKind kind_;
+    uint64_t type_id_;
+    HdfsOrcScanner* scanner_;
+    ScannerContext::Stream* stream_ = nullptr;
+    int io_reservation = 0;
+
+    ColumnRange(uint64_t length, uint64_t offset, orc::StreamKind kind, uint64_t type_id,
+        HdfsOrcScanner* scanner)
+      : offset_(offset),
+        length_(length),
+        current_position_(offset),
+        kind_(kind),
+        type_id_(type_id),
+        scanner_(scanner) {}
+
+    bool operator<(const ColumnRange& other) const { return offset_ < other.offset_; }
+
+    /// Read 'length' bytes from the stream_ starting at 'offset' into 'buf'.
+    Status read(void* buf, uint64_t length, uint64_t offset);
+
+    std::string debug() {
+      return strings::Substitute(
+          "colrange_offset: $0 colrange_length: $1 colrange_pos: $2 "
+          "typeId: $3 kind: $4 filename: $5",
+          offset_, length_, current_position_, type_id_, orc::streamKindToString(kind_),
+          scanner_->filename());
+    }
+  };
+
   /// A wrapper of DiskIoMgr to be used by the ORC lib.
   class ScanRangeInputStream : public orc::InputStream {
    public:
@@ -101,18 +134,21 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
           scanner->context_->partition_descriptor()->id(), filename_);
     }
 
+    /// Get the total length of the file in bytes.
     uint64_t getLength() const override {
       return file_desc_->file_length;
     }
 
-    uint64_t getNaturalReadSize() const override {
-      return ExecEnv::GetInstance()->disk_io_mgr()->max_buffer_size();
-    }
+    /// Get the natural size for reads.
+    /// Return 0 to let ORC lib decide what is the best buffer size depending on
+    /// compression method that is being used.
+    uint64_t getNaturalReadSize() const override { return 0; }
 
     /// Read 'length' bytes from the file starting at 'offset' into the buffer starting
     /// at 'buf'.
     void read(void* buf, uint64_t length, uint64_t offset) override;
 
+    /// Get the name of the stream for error messages.
     const std::string& getName() const override {
       return filename_;
     }
@@ -121,6 +157,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
     HdfsOrcScanner* scanner_;
     const HdfsFileDesc* file_desc_;
     std::string filename_;
+
+    /// Default read implementation for non async IO.
+    Status readRandom(void* buf, uint64_t length, uint64_t offset);
   };
 
   HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
@@ -220,9 +259,6 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   std::unordered_map<const SlotDescriptor*, uint64_t> slot_to_col_id_;
   std::unordered_map<const TupleDescriptor*, uint64_t> tuple_to_col_id_;
 
-  /// Scan range for the metadata (file tail).
-  const io::ScanRange* metadata_range_ = nullptr;
-
   /// With the help of it we can check the validity of ACID write ids.
   ValidWriteIdList valid_write_ids_;
 
@@ -242,22 +278,17 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// For files not written by Streaming Ingestion we can assume that every row is valid.
   bool row_batches_need_validation_ = false;
 
+  /// Scan range for column streams.
+  /// StartColumnReading() guarantees that columnRanges_ is sorted by the element's
+  /// offset, and there are no two overlapping range.
+  vector<ColumnRange> columnRanges_;
+
   /// Timer for materializing rows. This ignores time getting the next buffer.
   ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
 
-  /// Average and min/max time spent processing the footer by each split.
-  RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_ = nullptr;
-
-  /// Number of columns that need to be read.
-  RuntimeProfile::Counter* num_cols_counter_ = nullptr;
-
   /// Number of stripes that need to be read.
   RuntimeProfile::Counter* num_stripes_counter_ = nullptr;
 
-  /// Number of scanners that end up doing no reads because their splits don't overlap
-  /// with the midpoint of any stripe in the file.
-  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_ = nullptr;
-
   /// Number of collection items read in current row batch. It is a scanner-local counter
   /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
   /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
@@ -272,6 +303,19 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// row_reader_ to scan it.
   Status NextStripe() WARN_UNUSED_RESULT;
 
+  /// Begin reading columns of given stripe.
+  Status StartColumnReading(const orc::StripeInformation& stripe);
+
+  /// Find scan ColumnRange that where range [offset, offset+length) lies in.
+  /// Return nullptr if such range does not exist.
+  inline ColumnRange* FindColumnRange(uint64_t length, uint64_t offset) {
+    auto in_range = [length, offset](ColumnRange c) {
+      return (offset >= c.offset_) && (offset + length <= c.offset_ + c.length_);
+    };
+    auto range = std::find_if(columnRanges_.begin(), columnRanges_.end(), in_range);
+    return range != columnRanges_.end() ? &(*range) : nullptr;
+  }
+
   /// Reads data to materialize instances of 'tuple_desc'.
   /// Returns a non-OK status if a non-recoverable error was encountered and execution
   /// of this query should be terminated immediately.
@@ -345,6 +389,15 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// with the assumption that the specifit child is a literal.
   orc::Literal GetSearchArgumentLiteral(ScalarExprEvaluator* eval, int child_idx,
       const ColumnType& dst_type, orc::PredicateDataType* predicate_type);
+
+  /// Return true if [offset, offset+length) is within the remaining part of the footer
+  /// stream (initial scan range issued by IssueInitialRanges).
+  inline bool IsInFooterRange(uint64_t offset, uint64_t length) const {
+    DCHECK(stream_ != nullptr);
+    return offset >= stream_->file_offset() && length <= stream_->bytes_left();
+  }
+
+  Status ReadFooterStream(void* buf, uint64_t length, uint64_t offset);
 };
 
 } // namespace impala
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index aea118a..2a9ed1f 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -68,8 +68,6 @@ const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000;
 static const string PARQUET_MEM_LIMIT_EXCEEDED =
     "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
 
-static const string IDEAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupIdealReservation";
-static const string ACTUAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupActualReservation";
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
@@ -91,19 +89,15 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     advance_row_group_(true),
     min_max_tuple_(nullptr),
     row_batches_produced_(0),
-    metadata_range_(nullptr),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
     stats_batch_read_pool_(new MemPool(scan_node->mem_tracker())),
     assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
-    process_footer_timer_stats_(nullptr),
-    num_cols_counter_(nullptr),
     num_stats_filtered_row_groups_counter_(nullptr),
     num_minmax_filtered_row_groups_counter_(nullptr),
     num_bloom_filtered_row_groups_counter_(nullptr),
     num_rowgroups_skipped_by_unuseful_filters_counter_(nullptr),
     num_row_groups_counter_(nullptr),
     num_minmax_filtered_pages_counter_(nullptr),
-    num_scanners_with_no_reads_counter_(nullptr),
     num_dict_filtered_row_groups_counter_(nullptr),
     parquet_compressed_page_size_counter_(nullptr),
     parquet_uncompressed_page_size_counter_(nullptr),
@@ -116,10 +110,8 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
 }
 
 Status HdfsParquetScanner::Open(ScannerContext* context) {
-  RETURN_IF_ERROR(HdfsScanner::Open(context));
+  RETURN_IF_ERROR(HdfsColumnarScanner::Open(context));
   metadata_range_ = stream_->scan_range();
-  num_cols_counter_ =
-      ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
   num_stats_filtered_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumStatsFilteredRowGroups",
           TUnit::UNIT);
@@ -145,22 +137,14 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   num_pages_skipped_by_late_materialization_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumPagesSkippedByLateMaterialization",
           TUnit::UNIT);
-  num_scanners_with_no_reads_counter_ =
-      ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
   num_dict_filtered_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT);
-  process_footer_timer_stats_ =
-      ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "FooterProcessingTime");
   parquet_compressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
       scan_node_->runtime_profile(), "ParquetCompressedPageSize", TUnit::BYTES);
   parquet_uncompressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
       scan_node_->runtime_profile(), "ParquetUncompressedPageSize", TUnit::BYTES);
   process_page_index_stats_ =
       ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "PageIndexProcessingTime");
-  row_group_ideal_reservation_counter_ = ADD_SUMMARY_STATS_COUNTER(
-      scan_node_->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
-  row_group_actual_reservation_counter_ = ADD_SUMMARY_STATS_COUNTER(
-      scan_node_->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
 
   codegend_process_scratch_batch_fn_ = scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET);
   if (codegend_process_scratch_batch_fn_ == nullptr) {
@@ -1967,6 +1951,7 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64
   DCHECK_EQ(io_buffer->buffer(), buffer);
   DCHECK_EQ(io_buffer->len(), size);
   DCHECK(io_buffer->eosr());
+  AddSyncReadBytesCounter(io_buffer->len());
   object_range->ReturnBuffer(move(io_buffer));
   return Status::OK();
 }
@@ -2920,129 +2905,19 @@ Status HdfsParquetScanner::InitScalarColumns() {
     }
     RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
   }
-  RETURN_IF_ERROR(DivideReservationBetweenColumns(scalar_readers_));
-  return Status::OK();
-}
 
-Status HdfsParquetScanner::DivideReservationBetweenColumns(
-    const vector<BaseScalarColumnReader*>& column_readers) {
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
-  const int64_t min_buffer_size = io_mgr->min_buffer_size();
-  const int64_t max_buffer_size = io_mgr->max_buffer_size();
-  // The HdfsScanNode reservation calculation in the planner ensures that we have
-  // reservation for at least one buffer per column.
-  if (context_->total_reservation() < min_buffer_size * column_readers.size()) {
-    return Status(TErrorCode::INTERNAL_ERROR,
-        Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at "
-                   "least $1 bytes per column for $2 columns but had $3 bytes",
-            filename(), min_buffer_size, column_readers.size(),
-            context_->total_reservation()));
-  }
-
-  vector<int64_t> col_range_lengths(column_readers.size());
-  for (int i = 0; i < column_readers.size(); ++i) {
-    col_range_lengths[i] = column_readers[i]->scan_range()->bytes_to_read();
-  }
-
-  // The scanner-wide stream was used only to read the file footer.  Each column has added
-  // its own stream. We can use the total reservation now that 'stream_''s resources have
-  // been released. We may benefit from increasing reservation further, so let's compute
-  // the ideal reservation to scan all the columns.
-  int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths);
-  if (ideal_reservation > context_->total_reservation()) {
-    context_->TryIncreaseReservation(ideal_reservation);
-  }
-  row_group_actual_reservation_counter_->UpdateCounter(context_->total_reservation());
-  row_group_ideal_reservation_counter_->UpdateCounter(ideal_reservation);
-
-  vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
-      min_buffer_size, max_buffer_size, col_range_lengths, context_->total_reservation());
-  for (auto& tmp_reservation : tmp_reservations) {
-    column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
+  ColumnRangeLengths col_range_lengths(scalar_readers_.size());
+  for (int i = 0; i < scalar_readers_.size(); ++i) {
+    col_range_lengths[i] = scalar_readers_[i]->scan_range()->bytes_to_read();
   }
-  return Status::OK();
-}
 
-int64_t HdfsParquetScanner::ComputeIdealReservation(
-    const vector<int64_t>& col_range_lengths) {
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
-  int64_t ideal_reservation = 0;
-  for (int64_t len : col_range_lengths) {
-    ideal_reservation += io_mgr->ComputeIdealBufferReservation(len);
-  }
-  return ideal_reservation;
-}
-
-vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
-    int64_t min_buffer_size, int64_t max_buffer_size,
-    const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) {
-  // Pair of (column index, reservation allocated).
-  vector<pair<int, int64_t>> tmp_reservations;
-  for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0);
-
-  // Sort in descending order of length, breaking ties by index so that larger columns
-  // get allocated reservation first. It is common to have dramatically different column
-  // sizes in a single file because of different value sizes and compressibility. E.g.
-  // consider a large STRING "comment" field versus a highly compressible
-  // dictionary-encoded column with only a few distinct values. We want to give max-sized
-  // buffers to large columns first to maximize the size of I/Os that we do while reading
-  // this row group.
-  sort(tmp_reservations.begin(), tmp_reservations.end(),
-      [&col_range_lengths](
-          const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
-        int64_t left_len = col_range_lengths[left.first];
-        int64_t right_len = col_range_lengths[right.first];
-        return left_len != right_len ? left_len > right_len : left.first < right.first;
-      });
-
-  // Set aside the minimum reservation per column.
-  reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
-
-  // Allocate reservations to columns by repeatedly allocating either a max-sized buffer
-  // or a large enough buffer to fit the remaining data for each column. Do this
-  // round-robin up to the ideal number of I/O buffers.
-  for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
-    for (auto& tmp_reservation : tmp_reservations) {
-      // Add back the reservation we set aside above.
-      if (i == 0) reservation_to_distribute += min_buffer_size;
-
-      int64_t bytes_left_in_range =
-          col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
-      int64_t bytes_to_add;
-      if (bytes_left_in_range >= max_buffer_size) {
-        if (reservation_to_distribute >= max_buffer_size) {
-          bytes_to_add = max_buffer_size;
-        } else if (i == 0) {
-          DCHECK_EQ(0, tmp_reservation.second);
-          // Ensure this range gets at least one buffer on the first iteration.
-          bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
-        } else {
-          DCHECK_GT(tmp_reservation.second, 0);
-          // We need to read more than the max buffer size, but can't allocate a
-          // max-sized buffer. Stop adding buffers to this column: we prefer to use
-          // the existing max-sized buffers without small buffers mixed in so that
-          // we will alway do max-sized I/Os, which make efficient use of I/O devices.
-          bytes_to_add = 0;
-        }
-      } else if (bytes_left_in_range > 0 &&
-          reservation_to_distribute >= min_buffer_size) {
-        // Choose a buffer size that will fit the rest of the bytes left in the range.
-        bytes_to_add =
-            max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
-        // But don't add more reservation than is available.
-        bytes_to_add =
-            min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
-      } else {
-        bytes_to_add = 0;
-      }
-      DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add;
-      reservation_to_distribute -= bytes_to_add;
-      tmp_reservation.second += bytes_to_add;
-      DCHECK_GE(reservation_to_distribute, 0);
-      DCHECK_GT(tmp_reservation.second, 0);
-    }
+  ColumnReservations reservation_per_column;
+  RETURN_IF_ERROR(
+      DivideReservationBetweenColumns(col_range_lengths, reservation_per_column));
+  for (auto& col_reservation : reservation_per_column) {
+    scalar_readers_[col_reservation.first]->set_io_reservation(col_reservation.second);
   }
-  return tmp_reservations;
+  return Status::OK();
 }
 
 Status HdfsParquetScanner::InitDictionaries(
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 2e2e334..da37732 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -439,9 +439,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Version of the application that wrote this file.
   ParquetFileVersion file_version_;
 
-  /// Scan range for the metadata.
-  const io::ScanRange* metadata_range_;
-
   /// Pool to copy dictionary page buffer into. This pool is shared across all the
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -493,15 +490,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Timer for materializing rows.  This ignores time getting the next buffer.
   ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
 
-  /// Average and min/max time spent processing the footer by each split.
-  RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_;
-
   /// Average and min/max time spent processing the page index for each row group.
   RuntimeProfile::SummaryStatsCounter* process_page_index_stats_;
 
-  /// Number of columns that need to be read.
-  RuntimeProfile::Counter* num_cols_counter_;
-
   /// Number of row groups that are skipped because of Parquet statistics, either by
   /// row group level statistics, or page level statistics.
   RuntimeProfile::Counter* num_stats_filtered_row_groups_counter_;
@@ -537,10 +528,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// rows that survived filtering.
   RuntimeProfile::Counter* num_pages_skipped_by_late_materialization_counter_;
 
-  /// Number of scanners that end up doing no reads because their splits don't overlap
-  /// with the midpoint of any row-group in the file.
-  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
-
   /// Number of row groups skipped due to dictionary filter. This is an aggregated counter
   /// that includes the number of filtered row groups as a result of evaluating conjuncts
   /// and runtime bloom filters on the dictionary entries.
@@ -556,11 +543,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// to this counter
   RuntimeProfile::SummaryStatsCounter* parquet_uncompressed_page_size_counter_;
 
-  /// Average and min/max memory reservation for a scanning a row group, both
-  /// ideal(calculated based on min and max buffer size) and actual.
-  RuntimeProfile::SummaryStatsCounter* row_group_ideal_reservation_counter_;
-  RuntimeProfile::SummaryStatsCounter* row_group_actual_reservation_counter_;
-
   /// Number of collection items read in current row batch. It is a scanner-local counter
   /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
   /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
@@ -583,8 +565,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// that spans entire batch of length 'scratch_batch_->capacity'.
   ScratchMicroBatch complete_micro_batch_;
 
-  const char* filename() const { return metadata_range_->file(); }
-
   virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   /// Return true if we can evaluate this type of predicate on parquet statistic.
@@ -832,27 +812,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// does not start any scan ranges.
   Status InitScalarColumns() WARN_UNUSED_RESULT;
 
-  /// Decides how to divide stream_->reservation() between the columns. May increase
-  /// the reservation if more reservation would enable more efficient I/O for the
-  /// current columns being scanned. Sets the reservation on each corresponding reader
-  /// in 'column_readers'.
-  Status DivideReservationBetweenColumns(
-      const std::vector<BaseScalarColumnReader*>& column_readers);
-
-  /// Compute the ideal reservation to scan a file with scan range lengths
-  /// 'col_range_lengths' given the min and max buffer size of the singleton DiskIoMgr
-  /// in ExecEnv.
-  static int64_t ComputeIdealReservation(const std::vector<int64_t>& col_range_lengths);
-
-  /// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for
-  /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
-  /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
-  /// a vector with an entry per column with the index into 'col_range_lengths' and the
-  /// amount of reservation in bytes to give to that column.
-  static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper(
-      int64_t min_buffer_size, int64_t max_buffer_size,
-      const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute);
-
   /// Initializes the column readers in collection_readers_.
   void InitCollectionColumns();
 
diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc
index 8e63b4f..e46edfa 100644
--- a/be/src/exec/parquet/parquet-page-index.cc
+++ b/be/src/exec/parquet/parquet-page-index.cc
@@ -113,6 +113,7 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) {
   DCHECK_EQ(io_buffer->buffer(), page_index_buffer_.buffer());
   DCHECK_EQ(io_buffer->len(), page_index_buffer_.Size());
   DCHECK(io_buffer->eosr());
+  scanner_->AddSyncReadBytesCounter(io_buffer->len());
   object_range->ReturnBuffer(move(io_buffer));
 
   return Status::OK();
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
index d4a4d4e..972b326 100644
--- a/be/src/exec/parquet/parquet-page-reader.cc
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -86,9 +86,7 @@ Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
   const ScanRange* split_range =
       static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
   // Determine if the column is completely contained within a local split.
-  bool col_range_local = split_range->expected_local()
-      && col_start >= split_range->offset()
-      && col_end <= split_range->offset() + split_range->len();
+  bool col_range_local = split_range->ExpectedLocalRead(col_start, col_len);
   scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
       filename(), col_len, col_start, move(sub_ranges),
       partition_id, split_range->disk_id(),
@@ -104,16 +102,8 @@ Status ParquetPageReader::StartScan(int io_reservation) {
   DCHECK_GT(io_reservation, 0);
   DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
 
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
-  ScannerContext* context = parent_->context_;
-  bool needs_buffers;
-  RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
-        scan_range_, &needs_buffers));
-  if (needs_buffers) {
-    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-          context->bp_client(), scan_range_, io_reservation));
-  }
-  stream_ = parent_->context_->AddStream(scan_range_, io_reservation);
+  RETURN_IF_ERROR(
+      parent_->context_->AddAndStartStream(scan_range_, io_reservation, &stream_));
   DCHECK(stream_ != nullptr);
 
   state_ = State::ToReadHeader;
@@ -202,6 +192,7 @@ Status ParquetPageReader::ReadPageHeader(bool* eos) {
     }
   }
   RETURN_IF_ERROR(AdvanceStream(header_size));
+  parent_->AddAsyncReadBytesCounter(header_size);
   current_page_header_ = header;
   header_initialized_ = true;
   page_headers_read_++;
@@ -217,6 +208,7 @@ Status ParquetPageReader::ReadPageData(uint8_t** data) {
     DCHECK(!status.ok());
     return status;
   }
+  parent_->AddAsyncReadBytesCounter(current_page_header_.compressed_page_size);
   state_ = State::ToReadHeader;
   return Status::OK();
 }
@@ -224,6 +216,7 @@ Status ParquetPageReader::ReadPageData(uint8_t** data) {
 Status ParquetPageReader::SkipPageData() {
   DCHECK_EQ(state_, State::ToReadData);
   RETURN_IF_ERROR(AdvanceStream(current_page_header_.compressed_page_size));
+  parent_->AddSkippedReadBytesCounter(current_page_header_.compressed_page_size);
   state_ = State::ToReadHeader;
   return Status::OK();
 }
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index aeedefe..a9ac206 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -92,6 +92,20 @@ ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t rese
   return streams_.back().get();
 }
 
+Status ScannerContext::AddAndStartStream(
+    ScanRange* range, int64_t reservation, ScannerContext::Stream** stream) {
+  DCHECK(stream != nullptr);
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  bool needs_buffers;
+  RETURN_IF_ERROR(scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
+  if (needs_buffers) {
+    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(bp_client(), range, reservation));
+  }
+  *stream = AddStream(range, reservation);
+  DCHECK(*stream != nullptr);
+  return Status::OK();
+}
+
 void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
   if (done) {
     // Cancel the underlying scan range to clean up any queued buffers there
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index f4575ee..8766792 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -388,6 +388,11 @@ class ScannerContext {
   /// Returns the added stream. The returned stream is owned by this context.
   Stream* AddStream(io::ScanRange* range, int64_t reservation);
 
+  /// Add and start a stream to this ScannerContext for 'range'.
+  /// Returns the added stream through output parameter 'stream'.
+  Status AddAndStartStream(
+      io::ScanRange* range, int64_t reservation, ScannerContext::Stream** stream);
+
   /// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not
   /// multi-threaded and is done (finished, cancelled or reached it's limit).
   /// In all other cases returns false.
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 95bf0e8..b22a67f 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -147,7 +147,7 @@ class DiskQueue;
 /// In case a), ReturnBuffer() may re-enqueue the buffer for GetNext() to return again if
 /// needed. E.g. if 24MB of buffers were allocated to read a 64MB scan range, each buffer
 /// must be returned multiple times. Callers must be careful to call ReturnBuffer() with
-/// the previous buffer returned from the range before calling before GetNext() so that
+/// the previous buffer returned from the range before calling GetNext() so that
 /// at least one buffer is available for the I/O mgr to read data into. Calling GetNext()
 /// when the scan range has no buffers to read data into causes a resource deadlock.
 /// NB: if the scan range was allocated N buffers, then it's always ok for the caller
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 334c2b4..11a6b1e 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -331,6 +331,11 @@ class ScanRange : public RequestRange {
   /// Closes underlying reader, if no more data will be read from this scan range.
   void CloseReader(const std::unique_lock<std::mutex>& scan_range_lock);
 
+  /// Determine if [offset,offset+length) is expected to be a local range.
+  inline bool ExpectedLocalRead(int64_t offset, int64_t length) const {
+    return expected_local_ && offset >= offset_ && (offset + length <= offset_ + len_);
+  }
+
   /// return a descriptive string for debug.
   std::string DebugString() const;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index cc06645..afb9f1e 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1156,6 +1156,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_abort_java_udf_on_exception(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::ORC_ASYNC_READ: {
+        query_options->__set_orc_async_read(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 4c471c3..9934555 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ABORT_JAVA_UDF_ON_EXCEPTION + 1);\
+      TImpalaQueryOptions::ORC_ASYNC_READ+ 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -271,6 +271,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       PARQUET_DICTIONARY_RUNTIME_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(abort_java_udf_on_exception,\
       ABORT_JAVA_UDF_ON_EXCEPTION, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(orc_async_read, ORC_ASYNC_READ, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index e2a0705..9e5b1be 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -718,6 +718,9 @@ enum TImpalaQueryOptions {
   // Abort the Java UDF if an exception is thrown. Default is that only a
   // warning will be logged if the Java UDF throws an exception.
   ABORT_JAVA_UDF_ON_EXCEPTION = 140;
+
+  // Indicates whether to use ORC's search argument to push down predicates.
+  ORC_ASYNC_READ = 141
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 0313b85..9711c7e 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -571,6 +571,9 @@ struct TQueryOptions {
   // Abort the Java UDF if an exception is thrown. Default is that only a
   // warning will be logged if the Java UDF throws an exception.
   141: optional bool abort_java_udf_on_exception = false;
+
+  // Indicates whether to use ORC's async read.
+  142: optional bool orc_async_read = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index bc2eea2..60f339f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -380,6 +380,13 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
+   * Returns true if this scan node contains ORC.
+   */
+  private boolean hasOrc(Set<HdfsFileFormat> fileFormats) {
+    return fileFormats.contains(HdfsFileFormat.ORC);
+  }
+
+  /**
    * Returns true if the Parquet count(*) optimization can be applied to the query block
    * of this scan node.
    */
@@ -416,7 +423,7 @@ public class HdfsScanNode extends ScanNode {
       }
     }
 
-    if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+    if (hasOrc(fileFormats_)) {
       // Compute min-max conjuncts only if the ORC_READ_STATISTICS query option is
       // set to true.
       if (analyzer.getQueryOptions().orc_read_statistics) {
@@ -558,10 +565,10 @@ public class HdfsScanNode extends ScanNode {
   private boolean isUnsupportedStatsType(Type type) {
     // TODO(IMPALA-10882): Push down Min-Max predicates of CHAR/VARCHAR to ORC reader
     // TODO(IMPALA-10915): Push down Min-Max predicates of TIMESTAMP to ORC reader
-    return fileFormats_.contains(HdfsFileFormat.ORC)
+    return hasOrc(fileFormats_)
         && (type.getPrimitiveType() == PrimitiveType.CHAR
-            || type.getPrimitiveType() == PrimitiveType.VARCHAR
-            || type.getPrimitiveType() == PrimitiveType.TIMESTAMP);
+               || type.getPrimitiveType() == PrimitiveType.VARCHAR
+               || type.getPrimitiveType() == PrimitiveType.TIMESTAMP);
   }
 
   private void tryComputeBinaryStatsPredicate(Analyzer analyzer,
@@ -598,7 +605,7 @@ public class HdfsScanNode extends ScanNode {
         buildBinaryStatsPredicate(analyzer, slotRef, binaryPred,
             BinaryPredicate.Operator.GE);
       }
-      if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+      if (hasOrc(fileFormats_)) {
         // We can push down EQ predicates to the ORC reader directly.
         buildBinaryStatsPredicate(analyzer, slotRef, binaryPred, binaryPred.getOp());
       }
@@ -615,7 +622,7 @@ public class HdfsScanNode extends ScanNode {
     // Skip the slot ref if it refers to an array's "pos" field.
     if (slotDesc.isArrayPosRef()) return;
     if (inPred.isNotIn()) return;
-    if (fileFormats_.contains(HdfsFileFormat.ORC)) {
+    if (hasOrc(fileFormats_)) {
       if (isUnsupportedStatsType(slotDesc.getType())) return;
       addStatsOriginalConjunct(slotDesc.getParent(), inPred);
       buildInListStatsPredicate(analyzer, slotRef, inPred);
@@ -653,7 +660,7 @@ public class HdfsScanNode extends ScanNode {
   private void tryComputeIsNullStatsPredicate(Analyzer analyzer,
       IsNullPredicate isNullPred) {
     // Currently, only ORC table can push down IS-NULL predicates.
-    if (!fileFormats_.contains(HdfsFileFormat.ORC)) return;
+    if (!hasOrc(fileFormats_)) return;
     // Retrieve the left side of the IS-NULL predicate. Skip if it's not a simple slot.
     SlotRef slotRef = isNullPred.getBoundSlot();
     if (slotRef == null) return;
@@ -1907,7 +1914,7 @@ public class HdfsScanNode extends ScanNode {
       TupleDescriptor tupleDesc = entry.getKey();
       List<Expr> exprs = entry.getValue();
       String fileFormatStr;
-      if (hasParquet(fileFormats_) && fileFormats_.contains(HdfsFileFormat.ORC)) {
+      if (hasParquet(fileFormats_) && hasOrc(fileFormats_)) {
         fileFormatStr = "parquet/orc";
       } else {
         fileFormatStr = hasParquet(fileFormats_) ? "parquet" : "orc";
@@ -2002,8 +2009,9 @@ public class HdfsScanNode extends ScanNode {
     Preconditions.checkNotNull(desc_);
     Preconditions.checkState(desc_.getTable() instanceof FeFsTable);
     List<Long> columnReservations = null;
-    if (hasParquet(fileFormats_) || fileFormats_.contains(HdfsFileFormat.ORC)) {
-      columnReservations = computeMinColumnMemReservations();
+    if (hasParquet(fileFormats_) || hasOrc(fileFormats_)) {
+      boolean orcAsyncRead = hasOrc(fileFormats_) && queryOptions.orc_async_read;
+      columnReservations = computeMinColumnMemReservations(orcAsyncRead);
     }
 
     int perHostScanRanges = 0;
@@ -2057,7 +2065,8 @@ public class HdfsScanNode extends ScanNode {
 
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        .setMinMemReservationBytes(computeMinMemReservation(columnReservations))
+        .setMinMemReservationBytes(computeMinMemReservation(
+            columnReservations, queryOptions))
         .setThreadReservation(requiredThreads).build();
   }
 
@@ -2075,7 +2084,8 @@ public class HdfsScanNode extends ScanNode {
    * - The hdfs split size, to avoid reserving excessive memory for small files or ranges,
    *   e.g. small dimension tables with very few rows.
    */
-  private long computeMinMemReservation(List<Long> columnReservations) {
+  private long computeMinMemReservation(
+      List<Long> columnReservations, TQueryOptions queryOptions) {
     Preconditions.checkState(largestScanRangeBytes_ >= 0);
     long maxIoBufferSize =
         BitUtil.roundUpToPowerOf2(BackendConfig.INSTANCE.getReadSize());
@@ -2085,9 +2095,10 @@ public class HdfsScanNode extends ScanNode {
       // TODO: IMPALA-6875 - ORC should compute total reservation across columns once the
       // ORC scanner supports reservations. For now it is treated the same as a
       // row-oriented format because there is no per-column reservation.
-      if (format.isParquetBased()) {
-        // With Parquet, we first read the footer then all of the materialized columns in
-        // parallel.
+      if (format.isParquetBased()
+          || (format == HdfsFileFormat.ORC && queryOptions.orc_async_read)) {
+        // With Parquet and ORC, we first read the footer then all of the materialized
+        // columns in parallel.
         for (long columnReservation : columnReservations) {
           formatReservationBytes += columnReservation;
         }
@@ -2119,14 +2130,22 @@ public class HdfsScanNode extends ScanNode {
    * Compute minimum memory reservations in bytes per column per scan range for each of
    * the columns read from disk for a columnar format. Returns the raw estimate for
    * each column, not quantized to a buffer size.
-
+   *
    * If there are nested collections, returns a size for each of the leaf scalar slots
    * per collection. This matches Parquet's "shredded" approach to nested collections,
-   * where each nested field is stored as a separate column. We may need to adjust this
-   * logic for nested types in non-shredded columnar formats (e.g. IMPALA-6503 - ORC)
-   * if/when that is added.
+   * where each nested field is stored as a separate column.
+   *
+   * If table is in ORC format and orcAsyncRead is true, we split per column reservation
+   * evenly for number of stream representing that column. For example, an ORC string
+   * column with dictionary encoding has four streams (PRESENT, DATA, DICTIONARY_DATA, and
+   * LENGTH stream). A computeMinColumnMemReservations over ORC table having one string
+   * column will return list:
+   *   [1048576, 1048576, 1048576, 1048576]
+   * Meanwhile, computeMinColumnMemReservations over Parquet table having one string
+   * column will return list:
+   *   [4194304]
    */
-  private List<Long> computeMinColumnMemReservations() {
+  private List<Long> computeMinColumnMemReservations(boolean orcAsyncRead) {
     List<Long> columnByteSizes = new ArrayList<>();
     FeFsTable table = (FeFsTable) desc_.getTable();
     boolean havePosSlot = false;
@@ -2134,28 +2153,56 @@ public class HdfsScanNode extends ScanNode {
       if (!slot.isMaterialized() || slot == countStarSlot_) continue;
       if (slot.getColumn() == null ||
           slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
+        Type type = slot.getType();
         if (slot.isArrayPosRef()) {
           // Position virtual slots can be materialized by piggybacking on another slot.
           havePosSlot = true;
-        } else if (slot.getType().isScalarType()) {
+        } else if (type.isScalarType()) {
           Column column = slot.getColumn();
+          long estReservation;
           if (column == null) {
             // Not a top-level column, e.g. a value from a nested collection that is
             // being unnested by the scanner. No stats are available for nested
             // collections.
-            columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+            estReservation = DEFAULT_COLUMN_SCAN_RANGE_RESERVATION;
+          } else {
+            estReservation = computeMinScalarColumnMemReservation(column);
+          }
+
+          if (orcAsyncRead) {
+            // estReservation need to be spread for each stream. Estimate how many stream
+            // that will be available for this column based on column type.
+            int estNumStream = 2; // DATA and PRESENT stream.
+            if (type.isTimestamp() || type.isStringType() || type.isDecimal()) {
+              estNumStream++; // SECONDARY/LENGTH stream.
+              if (type.isStringType()) {
+                estNumStream++; // DICTIONARY_DATA stream
+              }
+            }
+            long reservationPerStream = estReservation / estNumStream;
+            for (int i = 0; i < estNumStream; i++) {
+              columnByteSizes.add(reservationPerStream);
+            }
           } else {
-            columnByteSizes.add(computeMinScalarColumnMemReservation(column));
+            columnByteSizes.add(estReservation);
           }
         } else {
-          appendMinColumnMemReservationsForComplexType(slot, columnByteSizes);
+          appendMinColumnMemReservationsForComplexType(
+              slot, columnByteSizes, orcAsyncRead);
         }
       }
     }
-    if (havePosSlot && columnByteSizes.isEmpty()) {
-      // Must scan something to materialize a position slot. We don't know anything about
-      // the column that we're scanning so use the default reservation.
-      columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+
+    if (columnByteSizes.isEmpty()) {
+      if (havePosSlot || (orcAsyncRead && desc_.getSlots().isEmpty())) {
+        // We must scan something to materialize a position slot (hasPosSlot=true).
+        // For the case of orcAsyncRead=true and empty descriptor slot, we probably need
+        // to materialize something too if the query can not be served by file metadata
+        // (ie., select count(*) against subtype of a complex column).
+        // We do not know anything about the column we are scanning in either case, so use
+        // the default reservation.
+        appendDefaultColumnReservation(columnByteSizes, orcAsyncRead);
+      }
     }
     return columnByteSizes;
   }
@@ -2163,10 +2210,11 @@ public class HdfsScanNode extends ScanNode {
   /**
    * Helper for computeMinColumnMemReservations() - compute minimum memory reservations
    * for all of the scalar columns read from disk when materializing complexSlot.
-   * Appends one number per scalar column to columnMemReservations.
+   * Appends one number per scalar column to columnMemReservations for Parquet format.
+   * For Orc, up to 4 number per scalar column can be added to columnMemReservations.
    */
   private void appendMinColumnMemReservationsForComplexType(SlotDescriptor complexSlot,
-      List<Long> columnMemReservations) {
+      List<Long> columnMemReservations, boolean orcAsyncRead) {
     Preconditions.checkState(complexSlot.getType().isComplexType());
     boolean addedColumn = false;
     for (SlotDescriptor nestedSlot: complexSlot.getItemTupleDesc().getSlots()) {
@@ -2175,16 +2223,40 @@ public class HdfsScanNode extends ScanNode {
       if (nestedSlot.getType().isScalarType()) {
         // No column stats are available for nested collections so use the default
         // reservation.
-        columnMemReservations.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+        appendDefaultColumnReservation(columnMemReservations, orcAsyncRead);
         addedColumn = true;
       } else if (nestedSlot.getType().isComplexType()) {
-        appendMinColumnMemReservationsForComplexType(nestedSlot, columnMemReservations);
+        appendMinColumnMemReservationsForComplexType(
+            nestedSlot, columnMemReservations, orcAsyncRead);
       }
     }
     // Need to scan at least one column to materialize the pos virtual slot and/or
     // determine the size of the nested array. Assume it is the size of a single I/O
     // buffer.
-    if (!addedColumn) columnMemReservations.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+    if (!addedColumn) {
+      appendDefaultColumnReservation(columnMemReservations, orcAsyncRead);
+    }
+  }
+
+  /**
+   * Compute minimum memory reservation for an unknown column.
+   *
+   * If orcAsyncRead is true, we allocate DEFAULT_COLUMN_SCAN_RANGE_RESERVATION and split
+   * it for 4 streams. This is because an ORC column can have 4 streams at most (ie.,
+   * dictionary encoded string/char/varchar column). If orcAsyncRead is false, just append
+   * single default reservation.
+   */
+  private void appendDefaultColumnReservation(
+      List<Long> columnMemReservations, boolean orcAsyncRead) {
+    if (orcAsyncRead) {
+      int estNumStream = 4;
+      Long reservationPerStream = DEFAULT_COLUMN_SCAN_RANGE_RESERVATION / estNumStream;
+      for (int i = 0; i < estNumStream; i++) {
+        columnMemReservations.add(reservationPerStream);
+      }
+    } else {
+      columnMemReservations.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+    }
   }
 
   /**
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index fd42e1f..6702f84 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1464,12 +1464,12 @@ select * from tpch_orc_def.lineitem
 # Hive 3 creates different number of files for this table than Hive 2.
 3
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=12.00MB Threads=2
+Max Per-Host Resource Reservation: Memory=60.00MB Threads=2
 Per-Host Resource Estimates: Memory=188MB
 Analyzed query: SELECT * FROM tpch_orc_def.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=188.00MB mem-reservation=12.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=188.00MB mem-reservation=60.00MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: tpch_orc_def.lineitem.l_orderkey, tpch_orc_def.lineitem.l_partkey, tpch_orc_def.lineitem.l_suppkey, tpch_orc_def.lineitem.l_linenumber, tpch_orc_def.lineitem.l_quantity, tpch_orc_def.lineitem.l_extendedprice, tpch_orc_def.lineitem.l_discount, tpch_orc_def.lineitem.l_tax, tpch_orc_def.lineitem.l_returnflag, tpch_orc_def.lineitem.l_linestatus, tpch_orc_def.lineitem.l_shipdate, tpch_orc_def.lineitem.l_commitdate, tpch_orc_def.lineitem.l_receiptdate, tpch_orc_def.lineitem.l_ [...]
 |  mem-estimate=100.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -1480,7 +1480,7 @@ PLAN-ROOT SINK
      table: rows=6.00M size=142.84MB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=6.00M
-   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+   mem-estimate=88.00MB mem-reservation=56.00MB thread-reservation=1
    tuple-ids=0 row-size=231B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ====
@@ -1515,12 +1515,12 @@ select l_comment from tpch_orc_def.lineitem
 # Hive 3 creates different number of files for this table than Hive 2.
 3
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=12.00MB Threads=2
+Max Per-Host Resource Reservation: Memory=8.00MB Threads=2
 Per-Host Resource Estimates: Memory=188MB
 Analyzed query: SELECT l_comment FROM tpch_orc_def.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=188.00MB mem-reservation=12.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=188.00MB mem-reservation=8.00MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: l_comment
 |  mem-estimate=100.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -1531,21 +1531,21 @@ PLAN-ROOT SINK
      table: rows=6.00M size=142.84MB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=6.00M
-   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+   mem-estimate=88.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=38B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ====
 # ORC scan on small files - memory reservation is reduced.
 select * from functional_orc_def.alltypes
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=4.09MB Threads=2
+Max Per-Host Resource Reservation: Memory=4.21MB Threads=2
 Per-Host Resource Estimates: Memory=26MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_orc_def.alltypes
 Analyzed query: SELECT * FROM functional_orc_def.alltypes
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=26.00MB mem-reservation=4.09MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=26.00MB mem-reservation=4.21MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: functional_orc_def.alltypes.id, functional_orc_def.alltypes.bool_col, functional_orc_def.alltypes.tinyint_col, functional_orc_def.alltypes.smallint_col, functional_orc_def.alltypes.int_col, functional_orc_def.alltypes.bigint_col, functional_orc_def.alltypes.float_col, functional_orc_def.alltypes.double_col, functional_orc_def.alltypes.date_string_col, functional_orc_def.alltypes.string_col, functional_orc_def.alltypes.timestamp_col, functional_orc_def.alltypes.year, func [...]
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -1557,7 +1557,7 @@ PLAN-ROOT SINK
      partitions: 0/24 rows=unavailable
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
+   mem-estimate=16.00MB mem-reservation=216.00KB thread-reservation=1
    tuple-ids=0 row-size=80B cardinality=unavailable
    in pipelines: 00(GETNEXT)
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
index e0b1c36..894fe44 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
@@ -35,8 +35,8 @@ STRING
 ---- RUNTIME_PROFILE
 row_regex:.*InitialRangeIdealReservation.*Avg: 128.00 KB.*
 row_regex:.*InitialRangeActualReservation.*Avg: 4.00 MB.*
-row_regex:.*ParquetRowGroupIdealReservation.*Avg: 24.00 MB.*
-row_regex:.*ParquetRowGroupActualReservation.*Avg: 24.00 MB.*
+row_regex:.*ColumnarScannerIdealReservation.*Avg: 24.00 MB.*
+row_regex:.*ColumnarScannerActualReservation.*Avg: 24.00 MB.*
 ====
 ---- QUERY
 # Scan moderately large file - scanner should try to increase reservation and fail.
@@ -50,8 +50,8 @@ STRING
 ---- RUNTIME_PROFILE
 row_regex:.*InitialRangeIdealReservation.*Avg: 128.00 KB.*
 row_regex:.*InitialRangeActualReservation.*Avg: 4.00 MB.*
-row_regex:.*ParquetRowGroupIdealReservation.*Avg: 24.00 MB.*
-row_regex:.*ParquetRowGroupActualReservation.*Avg: 4.00 MB.*
+row_regex:.*ColumnarScannerIdealReservation.*Avg: 24.00 MB.*
+row_regex:.*ColumnarScannerActualReservation.*Avg: 4.00 MB.*
 ====
 ---- QUERY
 # IMPALA-8742: Use ScanRange::bytes_to_read() instead of len(), it has an effect
@@ -59,5 +59,5 @@ row_regex:.*ParquetRowGroupActualReservation.*Avg: 4.00 MB.*
 select * from tpch_parquet.lineitem
 where l_orderkey < 10;
 ---- RUNTIME_PROFILE
-row_regex:.*ParquetRowGroupIdealReservation.*Avg: [34].\d+ MB.*
+row_regex:.*ColumnarScannerIdealReservation.*Avg: [34].\d+ MB.*
 ====