You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2018/01/12 09:10:13 UTC

[2/3] impala git commit: IMPALA-6290: limit ScannerContext to 1 buffer at a time

IMPALA-6290: limit ScannerContext to 1 buffer at a time

This is a prerequisite for constraining the number of buffers per scan
range. Before this patch, calling ReadBytes(), SkipBytes(), etc could
cause an arbitrary number of I/O buffers to accumulate in
'completed_io_buffers_'. E.g. if we allocated 3 * 8MB I/O buffers for
a range and then called ReadBytes(30MB), we would hit resource
exhaustion as soon as 3 buffers were accumulated in
'completed_io_buffers_'.

The fix is to avoid accumulating any buffers in 'completed_io_buffers_'.
Instead of adding them to 'completed_io_buffers_', completed buffers
are just returned to the I/O manager. It turned out that this did not
weaken the ScannerContext's guarantees about memory lifetime, because
ScannerContext::GetBytesInternal() cleared 'boundary_buffer_' each
time it was called regardless. I checked that this behaviour wasn't
a bug by inspecting the scanner code. I could not find any cases
where scanners depended on returned memory remaining valid beyond
the next Read*()/Get*()/Skip*() call on the stream.

This change makes that lifetime explicit in the comments. A
side-effect of this fix is that scanners do not need to call
ReleaseCompletedResources() in CommitRows() and means that the
ScannerContext only ever needs to hold one I/O buffer at a time.

This change also reimplements SkipBytes() to avoid it accumulating
memory in the boundary buffer for large skip sizes.

Also clarifies some of the invariants in ScannerContext. E.g. some
places assumed io_buffer_ != NULL, but that is no longer needed.

Testing:
Ran core tests with ASAN and exhaustive tests with DEBUG.

Change-Id: I74c5960a75f7d88b0e1de4199af731fb13e592f0
Reviewed-on: http://gerrit.cloudera.org:8080/8814
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fd5c3a7e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fd5c3a7e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fd5c3a7e

Branch: refs/heads/master
Commit: fd5c3a7e18d1fc4f340a349713035a2a6fd17792
Parents: 20daa4d
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Dec 5 10:57:35 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 12 02:06:33 2018 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc  |   1 -
 be/src/exec/hdfs-parquet-scanner.cc   |   1 -
 be/src/exec/hdfs-scanner.cc           |   7 -
 be/src/exec/hdfs-scanner.h            |  10 +-
 be/src/exec/hdfs-text-scanner.cc      |  42 +++---
 be/src/exec/parquet-column-readers.cc |   3 +-
 be/src/exec/scanner-context.cc        | 215 +++++++++++++++--------------
 be/src/exec/scanner-context.h         | 178 ++++++++++++++++--------
 be/src/exec/scanner-context.inline.h  |  55 ++++++--
 be/src/runtime/io/disk-io-mgr.cc      |   4 +
 10 files changed, 311 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 0475ac3..9cb6330 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -142,7 +142,6 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) {
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
-  DCHECK_EQ(context_->num_completed_io_buffers(), 0);
   // 'header_' can be nullptr if HdfsScanNodeBase::CreateAndOpenScanner() failed.
   if (!only_parsing_header_ && header_ != nullptr) {
     scan_node_->RangeComplete(file_format(), header_->compression_type);

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 4e4abef..f0f280d 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -279,7 +279,6 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
   DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
   DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
   DCHECK_EQ(scratch_batch_->total_allocated_bytes(), 0);
-  DCHECK_EQ(context_->num_completed_io_buffers(), 0);
 
   // Collect compression types for reporting completed ranges.
   vector<THdfsCompression::type> compression_types;

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 2b1a3b4..4cafa5d 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -188,13 +188,6 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
   row_batch->CommitRows(num_rows);
   tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
   tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
-
-  // We need to pass the row batch to the scan node if there is too much memory attached,
-  // which can happen if the query is very selective. We need to release memory even
-  // if no rows passed predicates.
-  if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) {
-    context_->ReleaseCompletedResources(/* done */ false);
-  }
   if (context_->cancelled()) return Status::CANCELLED;
   // Check for UDF errors.
   RETURN_IF_ERROR(state_->GetQueryStatus());

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 95bb58a..6228744 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -94,11 +94,10 @@ struct FieldLocation {
 ///    the codegen'd function to use.
 /// This way, we only codegen once per scanner type, rather than once per scanner object.
 //
-/// This class also encapsulates row batch management.  Subclasses should call CommitRows()
-/// after writing to the current row batch, which handles creating row batches, attaching
-/// resources (buffers and mem pools) to the current row batch, and passing row batches
-/// up to the scan node. Subclasses can also use GetMemory() to help with per-row memory
-/// management.
+/// This class also encapsulates row batch management. Subclasses should call
+/// CommitRows() after writing to the current row batch, which handles creating row
+/// batches, releasing per-batch resources, and passing row batches up to the scan node.
+/// Subclasses can also use GetMemory() to help with per-row memory management.
 /// TODO: Have a pass over all members and move them out of the base class if sensible
 /// to clarify which state each concrete scanner type actually has.
 class HdfsScanner {
@@ -316,7 +315,6 @@ class HdfsScanner {
       Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) WARN_UNUSED_RESULT;
 
   /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' accordingly.
-  /// Attaches completed resources from 'context_' to 'row_batch' if necessary.
   /// Frees expr result allocations. Returns non-OK if 'context_' is cancelled or the
   /// query status in 'state_' is non-OK.
   Status CommitRows(int num_rows, RowBatch* row_batch) WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 9ef7ce2..253bcc8 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -181,7 +181,6 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
   DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0);
-  DCHECK_EQ(context_->num_completed_io_buffers(), 0);
   if (!only_parsing_header_) {
     scan_node_->RangeComplete(THdfsFileFormat::TEXT,
         stream_->file_desc()->file_compression);
@@ -474,14 +473,17 @@ Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes)
   if (decompressor_.get() == nullptr) {
     Status status;
     if (num_bytes > 0) {
-      stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-          &byte_buffer_read_size_, &status);
+      if (!stream_->GetBytes(num_bytes,
+          reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), &byte_buffer_read_size_,
+          &status)) {
+        DCHECK(!status.ok());
+        return status;
+      }
     } else {
       DCHECK_EQ(num_bytes, 0);
-      status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-          &byte_buffer_read_size_);
+      RETURN_IF_ERROR(stream_->GetBuffer(false,
+          reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), &byte_buffer_read_size_));
     }
-    RETURN_IF_ERROR(status);
     *eosr = stream_->eosr();
   } else if (decompressor_->supports_streaming()) {
     DCHECK_EQ(num_bytes, 0);
@@ -511,9 +513,11 @@ Status HdfsTextScanner::DecompressBufferStream(int64_t bytes_to_read,
   } else {
     DCHECK_GT(bytes_to_read, 0);
     Status status;
-    stream_->GetBytes(bytes_to_read, &compressed_buffer_ptr, &compressed_buffer_size,
-        &status, true);
-    RETURN_IF_ERROR(status);
+    if (!stream_->GetBytes(bytes_to_read, &compressed_buffer_ptr, &compressed_buffer_size,
+        &status, true)) {
+      DCHECK(!status.ok());
+      return status;
+    }
   }
   int64_t compressed_buffer_bytes_read = 0;
   bool stream_end = false;
@@ -533,8 +537,10 @@ Status HdfsTextScanner::DecompressBufferStream(int64_t bytes_to_read,
   }
   // Skip the bytes in stream_ that were decompressed.
   Status status;
-  stream_->SkipBytes(compressed_buffer_bytes_read, &status);
-  RETURN_IF_ERROR(status);
+  if (!stream_->SkipBytes(compressed_buffer_bytes_read, &status)) {
+    DCHECK(!status.ok());
+    return status;
+  }
 
   if (stream_->eosr()) {
     if (stream_end) {
@@ -600,9 +606,11 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
   DCHECK_GT(file_size, 0);
 
   Status status;
-  stream_->GetBytes(file_size, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-      &byte_buffer_read_size_, &status);
-  RETURN_IF_ERROR(status);
+  if (!stream_->GetBytes(file_size, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
+      &byte_buffer_read_size_, &status)) {
+    DCHECK(!status.ok());
+    return status;
+  }
 
   // If didn't read anything, return.
   if (byte_buffer_read_size_ == 0) {
@@ -725,8 +733,10 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
   Status status;
   uint8_t* next_byte;
   int64_t out_len;
-  stream_->GetBytes(1, &next_byte, &out_len, &status, /*peek*/ true);
-  RETURN_IF_ERROR(status);
+  if (!stream_->GetBytes(1, &next_byte, &out_len, &status, /*peek*/ true)) {
+    DCHECK(!status.ok());
+    return status;
+  }
 
   // No more bytes after current buffer
   if (out_len == 0) return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 1c1cf39..317a4a5 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -27,6 +27,7 @@
 #include "exec/parquet-metadata-utils.h"
 #include "exec/parquet-scratch-tuple-batch.h"
 #include "exec/read-write-util.h"
+#include "exec/scanner-context.inline.h"
 #include "rpc/thrift-util.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/tuple-row.h"
@@ -1054,7 +1055,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
     data_page_pool_->FreeAll();
   }
   // We don't hold any pointers to earlier pages in the stream - we can safely free
-  // any accumulated I/O or boundary buffers.
+  // any I/O or boundary buffer.
   stream_->ReleaseCompletedResources(false);
 
   // Read the next data page, skipping page types we don't care about.

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index d9de769..0311f0e 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/scanner-context.h"
+#include "exec/scanner-context.inline.h"
 
 #include <gutil/strings/substitute.h>
 
@@ -38,11 +38,7 @@ using namespace strings;
 
 static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 
-// We always want output_buffer_bytes_left_ to be non-NULL, so we can avoid a NULL check
-// in GetBytes(). We use this variable, which is set to 0, to initialize
-// output_buffer_bytes_left_. After the first successful call to GetBytes(),
-// output_buffer_bytes_left_ will be set to something else.
-static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
+const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
     HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
@@ -50,7 +46,6 @@ ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
   : state_(state),
     scan_node_(scan_node),
     partition_desc_(partition_desc),
-    num_completed_io_buffers_(0),
     filter_ctxs_(filter_ctxs),
     expr_results_pool_(expr_results_pool) {
   AddStream(scan_range);
@@ -70,79 +65,59 @@ void ScannerContext::ClearStreams() {
   streams_.clear();
 }
 
-ScannerContext::Stream::Stream(ScannerContext* parent)
+ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
+    const HdfsFileDesc* file_desc)
   : parent_(parent),
+    scan_range_(scan_range),
+    file_desc_(file_desc),
+    file_len_(file_desc->file_length),
     next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
     boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
     boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
 }
 
 ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
-  std::unique_ptr<Stream> stream(new Stream(this));
-  stream->scan_range_ = range;
-  stream->file_desc_ = scan_node_->GetFileDesc(partition_desc_->id(), stream->filename());
-  stream->file_len_ = stream->file_desc_->file_length;
-  stream->total_bytes_returned_ = 0;
-  stream->io_buffer_pos_ = NULL;
-  stream->io_buffer_bytes_left_ = 0;
-  stream->boundary_buffer_bytes_left_ = 0;
-  stream->output_buffer_pos_ = NULL;
-  stream->output_buffer_bytes_left_ =
-      const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT);
-  streams_.push_back(std::move(stream));
+  streams_.emplace_back(new Stream(
+      this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
   return streams_.back().get();
 }
 
 void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
   if (done) {
-    // Mark any pending resources as completed
-    if (io_buffer_ != nullptr) {
-      ++parent_->num_completed_io_buffers_;
-      completed_io_buffers_.push_back(move(io_buffer_));
-    }
-    // Set variables to nullptr to make sure streams are not used again
-    io_buffer_pos_ = nullptr;
-    io_buffer_bytes_left_ = 0;
     // Cancel the underlying scan range to clean up any queued buffers there
     scan_range_->Cancel(Status::CANCELLED);
-  }
+    boundary_pool_->FreeAll();
 
-  for (unique_ptr<BufferDescriptor>& buffer : completed_io_buffers_) {
-    ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer));
+    // Reset variables - the stream is no longer valid.
+    io_buffer_pos_ = nullptr;
+    io_buffer_bytes_left_ = 0;
+    boundary_buffer_pos_ = nullptr;
+    boundary_buffer_bytes_left_ = 0;
+    boundary_buffer_->Reset();
   }
-  parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
-  completed_io_buffers_.clear();
-
-  if (done) boundary_pool_->FreeAll();
+  // Check if we're done with the current I/O buffer.
+  if (io_buffer_ != nullptr && io_buffer_bytes_left_ == 0) ReturnIoBuffer();
 }
 
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
+  DCHECK_EQ(0, io_buffer_bytes_left_);
   if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
+  if (io_buffer_ != nullptr) ReturnIoBuffer();
 
-  // Nothing to do if we've already processed all data in the file
+  // Nothing to do if we're at the end of the file - return leaving io_buffer_ == nullptr.
   int64_t offset = file_offset() + boundary_buffer_bytes_left_;
   int64_t file_bytes_remaining = file_desc()->file_length - offset;
-  if (io_buffer_ == NULL && file_bytes_remaining == 0) return Status::OK();
-
-  // Otherwise, io_buffer_ should only be null the first time this is called
-  DCHECK(io_buffer_ != NULL ||
-         (total_bytes_returned_ == 0 && completed_io_buffers_.empty()));
-
-  // We can't use the eosr() function because it reflects how many bytes have been
-  // returned, not if we're fetched all the buffers in the scan range
-  bool eosr = false;
-  if (io_buffer_ != NULL) {
-    eosr = io_buffer_->eosr();
-    ++parent_->num_completed_io_buffers_;
-    completed_io_buffers_.push_back(move(io_buffer_));
-  }
+  if (file_bytes_remaining == 0) return Status::OK();
 
-  if (!eosr) {
+  if (!scan_range_eosr_) {
+    // Get the next buffer from 'scan_range_'.
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
     Status status = scan_range_->GetNext(&io_buffer_);
-    DCHECK(!status.ok() || io_buffer_ != NULL);
+    DCHECK(!status.ok() || io_buffer_ != nullptr);
     RETURN_IF_ERROR(status);
+    scan_range_eosr_ = io_buffer_->eosr();
   } else {
+    // Already got all buffers from 'scan_range_' - reading past end.
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
     int64_t read_past_buffer_size = 0;
@@ -162,7 +137,6 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     DCHECK_GE(read_past_buffer_size, 0);
     if (read_past_buffer_size == 0) {
       io_buffer_bytes_left_ = 0;
-      // TODO: We are leaving io_buffer_ = NULL, revisit.
       return Status::OK();
     }
     int64_t partition_id = parent_->partition_descriptor()->id();
@@ -173,8 +147,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
         parent_->scan_node_->reader_context(), range, &io_buffer_));
   }
 
-  DCHECK(io_buffer_ != NULL);
-  if (UNLIKELY(io_buffer_ == NULL)) {
+  DCHECK(io_buffer_ != nullptr);
+  if (UNLIKELY(io_buffer_ == nullptr)) {
     // This has bitten us before, so we defend against NULL in release builds here. It
     // indicates an error in the IoMgr, which did not return a valid buffer.
     // TODO(IMPALA-5914): Remove this check once we're confident we're not hitting it.
@@ -195,12 +169,12 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
 }
 
 Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_t* len) {
-  *out_buffer = NULL;
+  *out_buffer = nullptr;
   *len = 0;
   if (eosr()) return Status::OK();
 
   if (UNLIKELY(parent_->cancelled())) {
-    DCHECK(*out_buffer == NULL);
+    DCHECK(*out_buffer == nullptr);
     return Status::CANCELLED;
   }
 
@@ -212,8 +186,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
     *len = min(boundary_buffer_bytes_left_, bytes_left());
     DCHECK_GE(*len, 0);
     if (!peek) {
-      boundary_buffer_pos_ += *len;
-      boundary_buffer_bytes_left_ -= *len;
+      AdvanceBufferPos(*len, &boundary_buffer_pos_, &boundary_buffer_bytes_left_);
       total_bytes_returned_ += *len;
     }
     return Status::OK();
@@ -229,13 +202,12 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
     output_buffer_pos_ = &io_buffer_pos_;
     output_buffer_bytes_left_ = &io_buffer_bytes_left_;
   }
-  DCHECK(io_buffer_ != NULL);
+  DCHECK(io_buffer_ != nullptr);
 
   *out_buffer = io_buffer_pos_;
   *len = io_buffer_bytes_left_;
   if (!peek) {
-    io_buffer_bytes_left_ = 0;
-    io_buffer_pos_ += *len;
+    AdvanceBufferPos(*len, &io_buffer_pos_, &io_buffer_bytes_left_);
     total_bytes_returned_ += *len;
   }
   DCHECK_GE(bytes_left(), 0);
@@ -246,74 +218,117 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
 Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
     uint8_t** out_buffer, bool peek, int64_t* out_len) {
   DCHECK_GT(requested_len, boundary_buffer_bytes_left_);
-  *out_buffer = NULL;
+  DCHECK(output_buffer_bytes_left_ != &io_buffer_bytes_left_
+      || requested_len > io_buffer_bytes_left_) << "All bytes in output buffer "
+      << requested_len << " " << io_buffer_bytes_left_;
+  *out_buffer = nullptr;
 
   if (boundary_buffer_bytes_left_ == 0) boundary_buffer_->Clear();
-
   DCHECK(ValidateBufferPointers());
 
-  while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
-    // We must copy the remainder of 'io_buffer_' to 'boundary_buffer_' before advancing
-    // to handle the case when the read straddles a block boundary. Preallocate
-    // 'boundary_buffer_' to avoid unnecessary resizes for large reads.
+  // First this loop ensures, by reading I/O buffers one-by-one, that we've got all of
+  // the requested bytes in 'boundary_buffer_', 'io_buffer_', or split between the two.
+  // We may not be able to get all of the bytes if we hit eof.
+  while (boundary_buffer_bytes_left_ + io_buffer_bytes_left_ < requested_len) {
     if (io_buffer_bytes_left_ > 0) {
+      // Copy the remainder of 'io_buffer_' to 'boundary_buffer_' before getting the next
+      // 'io_buffer_'. Preallocate 'boundary_buffer_' to avoid unnecessary resizes for
+      // large reads.
       RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
-      RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
-      boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
-
-      // Make state consistent in case we return early with an error below.
-      io_buffer_bytes_left_ = 0;
-      output_buffer_pos_ = &boundary_buffer_pos_;
-      output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
+      RETURN_IF_ERROR(CopyIoToBoundary(io_buffer_bytes_left_));
     }
-
-    int64_t remaining_requested_len = requested_len - boundary_buffer_->len();
+    int64_t remaining_requested_len = requested_len - boundary_buffer_bytes_left_;
     RETURN_IF_ERROR(GetNextBuffer(remaining_requested_len));
     if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
     // No more bytes (i.e. EOF).
     if (io_buffer_bytes_left_ == 0) break;
   }
 
-  // We have read the full 'requested_len' bytes or couldn't read more bytes.
+  // We have read the full 'requested_len' bytes or hit eof.
+  // We can assemble the contiguous bytes in two ways:
+  // 1. if the the read range falls entirely with an I/O buffer, we return a pointer into
+  //    that I/O buffer.
+  // 2. if the read straddles I/O buffers, we append the data to 'boundary_buffer_'.
+  //    'boundary_buffer_' may already contain some of the data that we need if we did a
+  //    "peek" earlier.
   int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_;
   DCHECK_GE(requested_bytes_left, 0);
-  int64_t num_bytes = min(io_buffer_bytes_left_, requested_bytes_left);
-  *out_len = boundary_buffer_bytes_left_ + num_bytes;
+  int64_t num_bytes_left_to_copy = min(io_buffer_bytes_left_, requested_bytes_left);
+  *out_len = boundary_buffer_bytes_left_ + num_bytes_left_to_copy;
   DCHECK_LE(*out_len, requested_len);
-
   if (boundary_buffer_bytes_left_ == 0) {
-    // No stitching, just return the memory
+    // Case 1: return a pointer into the I/O buffer.
     output_buffer_pos_ = &io_buffer_pos_;
     output_buffer_bytes_left_ = &io_buffer_bytes_left_;
   } else {
-    RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
-    boundary_buffer_bytes_left_ += num_bytes;
-    boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
-        boundary_buffer_->len() - boundary_buffer_bytes_left_;
-    io_buffer_bytes_left_ -= num_bytes;
-    io_buffer_pos_ += num_bytes;
-
-    output_buffer_pos_ = &boundary_buffer_pos_;
-    output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
+    // Case 2: return a pointer into the boundary buffer, after copying any required
+    // data from the I/O buffer.
+    DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
+    DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
+    if (io_buffer_bytes_left_ > 0) {
+      RETURN_IF_ERROR(CopyIoToBoundary(num_bytes_left_to_copy));
+    }
   }
   *out_buffer = *output_buffer_pos_;
-
   if (!peek) {
     total_bytes_returned_ += *out_len;
-    if (boundary_buffer_bytes_left_ == 0) {
-      io_buffer_bytes_left_ -= num_bytes;
-      io_buffer_pos_ += num_bytes;
-    } else {
-      DCHECK_EQ(boundary_buffer_bytes_left_, *out_len);
-      boundary_buffer_bytes_left_ = 0;
-    }
+    AdvanceBufferPos(*out_len, output_buffer_pos_, output_buffer_bytes_left_);
   }
+  DCHECK(ValidateBufferPointers());
+  return Status::OK();
+}
 
+bool ScannerContext::Stream::SkipBytesInternal(
+    int64_t length, int64_t bytes_left, Status* status) {
+  DCHECK_GT(bytes_left, 0);
+  DCHECK_EQ(0, boundary_buffer_bytes_left_);
+  DCHECK_EQ(0, io_buffer_bytes_left_);
+  // Skip data in subsequent buffers by simply fetching them.
+  // TODO: consider adding support to skip ahead in a ScanRange so we can avoid doing
+  // actual I/O in some cases.
+  while (bytes_left > 0) {
+    *status = GetNextBuffer(bytes_left);
+    if (!status->ok()) return false;
+    if (io_buffer_ == nullptr) {
+      // Hit end of file before reading the requested bytes.
+      DCHECK_GT(bytes_left, 0);
+      *status = ReportIncompleteRead(length, length - bytes_left);
+      return false;
+    }
+    int64_t io_buffer_bytes_to_skip = std::min(bytes_left, io_buffer_bytes_left_);
+    AdvanceBufferPos(io_buffer_bytes_to_skip, &io_buffer_pos_, &io_buffer_bytes_left_);
+    // Check if we skipped all data in this I/O buffer.
+    if (io_buffer_bytes_left_ == 0) ReturnIoBuffer();
+    bytes_left -= io_buffer_bytes_to_skip;
+    total_bytes_returned_ += io_buffer_bytes_to_skip;
+  }
+  return true;
+}
 
-  DCHECK(ValidateBufferPointers());
+Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) {
+  DCHECK(io_buffer_ != nullptr);
+  DCHECK_GT(io_buffer_bytes_left_, 0);
+  DCHECK_GE(io_buffer_bytes_left_, num_bytes);
+  RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
+  boundary_buffer_bytes_left_ += num_bytes;
+  boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
+      boundary_buffer_->len() - boundary_buffer_bytes_left_;
+  AdvanceBufferPos(num_bytes, &io_buffer_pos_, &io_buffer_bytes_left_);
+  // If all data from I/O buffer was returned or copied to boundary buffer, we don't need
+  // I/O buffer.
+  if (io_buffer_bytes_left_ == 0) ReturnIoBuffer();
+  output_buffer_pos_ = &boundary_buffer_pos_;
+  output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
   return Status::OK();
 }
 
+void ScannerContext::Stream::ReturnIoBuffer() {
+  DCHECK(io_buffer_ != nullptr);
+  ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_));
+  io_buffer_pos_ = nullptr;
+  io_buffer_bytes_left_ = 0;
+}
+
 bool ScannerContext::cancelled() const {
   if (!scan_node_->HasRowBatchQueue()) return false;
   return static_cast<HdfsScanNode*>(scan_node_)->done();

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 3ad6753..e316063 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -57,6 +57,28 @@ class TupleRow;
 ///      from processing the bytes. This is the consumer.
 ///   3. The scan node/main thread which calls into the context to trigger cancellation
 ///      or other end of stream conditions.
+///
+/// Memory management
+/// =================
+/// Pointers into memory returned from stream methods remain valid until either
+/// ReleaseCompletedResources() is called or an operation advances the stream's read
+/// offset past the end of the memory .
+/// E.g. if ReadBytes(peek=false) is called, the memory returned is invalidated when
+/// ReadBytes(), SkipBytes(), ReadVint(), etc is called. If the memory is obtained by
+/// a "peeking" operation, then the memory returned remains valid until the read offset
+/// in the stream is advanced past the end of the memory. E.g. if
+/// ReadBuffer(n, peek=true) is called, then the memory remains valid if SkipBytes(n)
+/// is called the first time, but not if SkipBytes() is called again to advance further.
+///
+/// Each stream only requires a single I/O buffer to make progress on reading through the
+/// stream. Additional I/O buffers allow the I/O manager to read ahead in the scan range.
+/// The scanner context also allocates memory from a MemPool for reads that straddle I/O
+/// buffers (e.g. a small read at the boundary of I/O buffers or a read larger than
+/// a single I/O buffer). The amount of memory allocated from the MemPool is determined
+/// by the maximum buffer size read from the stream, plus some overhead. E.g.
+/// ReadBytes(length=50KB) requires allocating a 50KB buffer from the MemPool if the
+/// read straddles a buffer boundary.
+///
 /// TODO: Some of the synchronization mechanisms such as cancelled() can be removed
 /// once the legacy hdfs scan node has been removed.
 class ScannerContext {
@@ -93,7 +115,7 @@ class ScannerContext {
     /// Note that this will return bytes past the end of the scan range until the end of
     /// the file.
     bool GetBytes(int64_t requested_len, uint8_t** buffer, int64_t* out_len,
-        Status* status, bool peek = false);
+        Status* status, bool peek = false) WARN_UNUSED_RESULT;
 
     /// Gets the bytes from the first available buffer within the scan range. This may be
     /// the boundary buffer used to stitch IO buffers together.
@@ -117,11 +139,13 @@ class ScannerContext {
     /// Return the number of bytes left in the range for this stream.
     int64_t bytes_left() { return scan_range_->len() - total_bytes_returned_; }
 
-    /// If true, all bytes in this scan range have been returned or we have reached eof
-    /// (the scan range could be longer than the file).
+    /// If true, all bytes in this scan range have been returned from this ScannerContext
+    /// to callers or we hit eof before reaching the end of the scan range. Callers can
+    /// continue to call Read*()/Get*()/Skip*() methods on the stream until eof() is true.
     bool eosr() const { return total_bytes_returned_ >= scan_range_->len() || eof(); }
 
-    /// If true, the stream has reached the end of the file.
+    /// If true, the stream has reached the end of the file. After this is true, any
+    /// Read*()/Get*()/Skip*() methods will not succeed.
     bool eof() const { return file_offset() == file_len_; }
 
     const char* filename() { return scan_range_->file(); }
@@ -134,43 +158,56 @@ class ScannerContext {
     /// Returns the total number of bytes returned
     int64_t total_bytes_returned() { return total_bytes_returned_; }
 
-    /// Read a Boolean primitive value written using Java serialization.
+    /// Read a Boolean primitive value written using Java serialization. Returns true
+    /// on success, otherwise returns false and sets 'status' to indicate the error.
     /// Equivalent to java.io.DataInput.readBoolean()
-    bool ReadBoolean(bool* boolean, Status*);
+    bool ReadBoolean(bool* boolean, Status* status) WARN_UNUSED_RESULT;
 
-    /// Read an Integer primitive value written using Java serialization.
+    /// Read an Integer primitive value written using Java serialization. Returns true
+    /// on success, otherwise returns false and sets 'status' to indicate the error.
     /// Equivalent to java.io.DataInput.readInt()
-    bool ReadInt(int32_t* val, Status*, bool peek = false);
+    bool ReadInt(int32_t* val, Status* status, bool peek = false) WARN_UNUSED_RESULT;
 
-    /// Read a variable-length Long value written using Writable serialization.
+    /// Read a variable-length Long value written using Writable serialization. Returns
+    /// true on success, otherwise returns false and sets 'status' to indicate the error.
     /// Ref: org.apache.hadoop.io.WritableUtils.readVLong()
-    bool ReadVLong(int64_t* val, Status*);
+    bool ReadVLong(int64_t* val, Status* status) WARN_UNUSED_RESULT;
 
-    /// Read a variable length Integer value written using Writable serialization.
+    /// Read a variable length Integer value written using Writable serialization. Returns
+    /// true on success, otherwise returns false and sets 'status' to indicate the error.
     /// Ref: org.apache.hadoop.io.WritableUtils.readVInt()
-    bool ReadVInt(int32_t* val, Status*);
+    bool ReadVInt(int32_t* val, Status* status) WARN_UNUSED_RESULT;
 
-    /// Read a zigzag encoded long
-    bool ReadZLong(int64_t* val, Status*);
+    /// Read a zigzag encoded long. Returns true on success, otherwise returns false and
+    /// sets 'status' to indicate the error.
+    bool ReadZLong(int64_t* val, Status* status) WARN_UNUSED_RESULT;
 
-    /// Skip over the next length bytes in the specified HDFS file.
-    bool SkipBytes(int64_t length, Status*);
+    /// Skip over the next length bytes in the specified HDFS file. Returns true on
+    /// success, otherwise returns false and sets 'status' to indicate the error.
+    bool SkipBytes(int64_t length, Status* status) WARN_UNUSED_RESULT;
 
     /// Read length bytes into the supplied buffer.  The returned buffer is owned
-    /// by this object.
-    bool ReadBytes(int64_t length, uint8_t** buf, Status*, bool peek = false);
+    /// by this object. Returns true on success, otherwise returns false and sets 'status'
+    /// to indicate the error.
+    bool ReadBytes(int64_t length, uint8_t** buf, Status* status, bool peek = false)
+        WARN_UNUSED_RESULT;
 
-    /// Read a Writable Text value from the supplied file.
+    /// Read a Writable Text value from the supplied file. Returns true on success,
+    /// otherwise returns false and sets 'status' to indicate the error.
     /// Ref: org.apache.hadoop.io.WritableUtils.readString()
     /// The returned buffer is owned by this object.
-    bool ReadText(uint8_t** buf, int64_t* length, Status*);
+    bool ReadText(uint8_t** buf, int64_t* length, Status* status) WARN_UNUSED_RESULT;
 
-    /// Skip this text object.
-    bool SkipText(Status*);
+    /// Skip this text object. Returns true on success, otherwise returns false and
+    /// sets 'status'
+    bool SkipText(Status* status) WARN_UNUSED_RESULT;
 
-    /// Release all completed resources in the context, i.e. I/O and boundary buffers
-    /// that the caller has finished reading. If 'done' is true all resources are
-    /// freed, even if the caller has not read that data yet.
+    /// Release completed resources, e.g. the last buffer if the current read position is
+    /// at the end of the buffer. If 'done' is true all resources are freed, even if the
+    /// caller has not read that data yet. After calling this function, any memory
+    /// returned from previous Read*()/Get*() functions is invalid to reference.
+    ///
+    /// Also see the ScannerContext::ReleaseCompletedResources() comment.
     void ReleaseCompletedResources(bool done);
 
    private:
@@ -180,7 +217,7 @@ class ScannerContext {
     const HdfsFileDesc* file_desc_;
 
     /// Total number of bytes returned from GetBytes()
-    int64_t total_bytes_returned_;
+    int64_t total_bytes_returned_ = 0;
 
     /// File length. Initialized with file_desc_->file_length but updated if eof is found
     /// earlier, i.e. the file was truncated.
@@ -194,14 +231,20 @@ class ScannerContext {
     /// doubling algorithm. Unused if 'read_past_size_cb_' is set.
     int64_t next_read_past_size_bytes_;
 
-    /// The current io buffer. This starts as NULL before we've read any bytes.
+    /// The current I/O buffer. NULL before we've read any bytes or if the last read
+    /// I/O buffer was released.
     std::unique_ptr<io::BufferDescriptor> io_buffer_;
 
+    /// True if 'scan_range_' returned eosr, which means that we read to the end of that
+    /// scan range. This is different from eosr() because it tracks whether the
+    /// scan range reached eosr, not whether eosr() was returned to the caller.
+    bool scan_range_eosr_ = false;
+
     /// Next byte to read in io_buffer_
-    uint8_t* io_buffer_pos_;
+    uint8_t* io_buffer_pos_ = nullptr;
 
     /// Bytes left in io_buffer_
-    int64_t io_buffer_bytes_left_;
+    int64_t io_buffer_bytes_left_ = 0;
 
     /// The boundary buffer is used to copy multiple IO buffers from the scan range into a
     /// single buffer to return to the scanner.  After copying all or part of an IO buffer
@@ -212,30 +255,55 @@ class ScannerContext {
     /// scanner, in the current IO buffer, or in the boundary buffer.
     boost::scoped_ptr<MemPool> boundary_pool_;
     boost::scoped_ptr<StringBuffer> boundary_buffer_;
-    uint8_t* boundary_buffer_pos_;
-    int64_t boundary_buffer_bytes_left_;
+    uint8_t* boundary_buffer_pos_ = nullptr;
+    int64_t boundary_buffer_bytes_left_ = 0;
 
     /// Points to either io_buffer_pos_ or boundary_buffer_pos_
     /// (initialized to NULL before calling GetBytes())
-    uint8_t** output_buffer_pos_;
+    uint8_t** output_buffer_pos_ = nullptr;
 
     /// Points to either io_buffer_bytes_left_ or boundary_buffer_bytes_left_
     /// (initialized to a static zero-value int before calling GetBytes())
-    int64_t* output_buffer_bytes_left_;
+    int64_t* output_buffer_bytes_left_ =
+        const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT);
 
-    /// List of buffers that are completed but still have bytes referenced by the caller.
-    /// On the next GetBytes() call, these buffers are released (the caller by calling
-    /// GetBytes() signals it is done with its previous bytes).  At this point the
-    /// buffers are returned to the I/O manager.
-    std::deque<std::unique_ptr<io::BufferDescriptor>> completed_io_buffers_;
+    /// We always want output_buffer_bytes_left_ to be non-NULL, so we can avoid a NULL
+    /// check in GetBytes(). We use this variable, which is set to 0, to initialize
+    /// output_buffer_bytes_left_. After the first successful call to GetBytes(),
+    /// output_buffer_bytes_left_ will be set to something else.
+    static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
 
-    Stream(ScannerContext* parent);
+    Stream(ScannerContext* parent, io::ScanRange* scan_range,
+        const HdfsFileDesc* file_desc);
 
     /// GetBytes helper to handle the slow path.
-    /// If peek is set then return the data but do not move the current offset.
+    /// If 'peek' is true then return the data but do not move the current offset.
+    /// If 'peek' is not true, the returned buffer memory remains valid until next
+    /// operation that reads from the stream.
     Status GetBytesInternal(int64_t requested_len, uint8_t** buffer, bool peek,
                             int64_t* out_len);
 
+    /// SkipBytes() helper to handle the slow path where we need to skip past the
+    /// current I/O buffer. Called when the current I/O and boundary buffers are
+    /// exhausted.  Skips 'bytes_left' bytes in subsequent I/O buffers. 'length' is the
+    /// argument to the SkipBytes() call, used for error reporting. Sets 'io_buffer_',
+    /// 'io_buffer_pos_', 'io_buffer_bytes_left_' and 'total_bytes_returned_'.
+    bool SkipBytesInternal(int64_t length, int64_t bytes_left, Status* status);
+
+    /// Copy 'num_bytes' bytes from the I/O buffer at 'io_buffer_pos_' to the
+    /// boundary buffer and set 'output_buffer_pos_' and 'output_buffer_bytes_left_'
+    /// to point at the boundary buffer variables. Advances 'io_buffer_pos_' and
+    /// 'io_buffer_bytes_left_' by 'num_bytes'. Returns an error if the boundary
+    /// buffer cannot be extended to fit the new data.
+    ///
+    /// Returns 'io_buffer_' to the I/O manager if all its data was copied to the
+    /// boundary buffer.
+    Status CopyIoToBoundary(int64_t num_bytes);
+
+    /// Returns 'io_buffer_' to the I/O manager, setting it to NULL in the process,
+    /// and resets 'io_buffer_bytes_left_' and 'io_buffer_pos_'.
+    void ReturnIoBuffer();
+
     /// Gets (and blocks) for the next io buffer. After fetching all buffers in the scan
     /// range, performs synchronous reads past the scan range until EOF.
     //
@@ -251,6 +319,10 @@ class ScannerContext {
     /// never set to NULL, even if it contains 0 bytes.
     Status GetNextBuffer(int64_t read_past_size = 0);
 
+    /// Helper to advance position and bytes left for a buffer by 'bytes'.
+    void AdvanceBufferPos(
+        int64_t bytes, uint8_t** buffer_pos, int64_t* buffer_bytes_left);
+
     /// Validates that the output buffer pointers point to the correct buffer.
     bool ValidateBufferPointers() const;
 
@@ -266,24 +338,18 @@ class ScannerContext {
     return streams_[idx].get();
   }
 
-  /// Returns completed I/O buffers to the I/O manager. If 'done' is true, this is the
-  /// final call for the current streams and any pending resources in each stream are
-  /// also freed. Callers which want to clear the streams from the context should also
-  /// call ClearStreams().
+  /// Release completed resources for all streams, e.g. the last buffer in each stream if
+  /// the current read position is at the end of the buffer. If 'done' is true all
+  /// resources are freed, even if the caller has not read that data yet. After calling
+  /// this function, any memory returned from previous Read*()/Get*() functions is
+  /// invalid to reference. Callers which want to clear the streams from the context
+  /// should also call ClearStreams().
   ///
   /// This must be called with 'done' set when the scanner is complete and no longer needs
-  /// any resources (e.g. tuple memory, io buffers) returned from the current streams.
-  /// After calling with 'done' set, this should be called again if new streams are
-  /// created via AddStream().
+  /// any resources. After calling with 'done' set, this should be called again if new
+  /// streams are created via AddStream().
   void ReleaseCompletedResources(bool done);
 
-  /// Overload with the signature expected by Impala-lzo to enable easier staging of
-  /// the API change. TODO: remove this once Impala-lzo is updated to use the new
-  /// signature.
-  void ReleaseCompletedResources(RowBatch* batch, bool done) {
-    ReleaseCompletedResources(done);
-  }
-
   /// Releases all the Stream objects in the vector 'streams_' and reduces the vector's
   /// size to 0.
   void ClearStreams();
@@ -296,7 +362,6 @@ class ScannerContext {
   /// Always returns false if the scan_node_ is not multi-threaded.
   bool cancelled() const;
 
-  int num_completed_io_buffers() const { return num_completed_io_buffers_; }
   HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
   MemPool* expr_results_pool() const { return expr_results_pool_; }
@@ -311,9 +376,6 @@ class ScannerContext {
   /// Vector of streams. Non-columnar formats will always have one stream per context.
   std::vector<std::unique_ptr<Stream>> streams_;
 
-  /// Always equal to the sum of completed_io_buffers_.size() across all streams.
-  int num_completed_io_buffers_;
-
   /// Filter contexts for all filters applicable to this scan. Memory attached to the
   /// context is owned by the scan node.
   std::vector<FilterContext> filter_ctxs_;

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/exec/scanner-context.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.inline.h b/be/src/exec/scanner-context.inline.h
index 5c6f049..f4f3bcb 100644
--- a/be/src/exec/scanner-context.inline.h
+++ b/be/src/exec/scanner-context.inline.h
@@ -46,8 +46,7 @@ inline bool ScannerContext::Stream::GetBytes(int64_t requested_len, uint8_t** bu
     *buffer = *output_buffer_pos_;
     if (LIKELY(!peek)) {
       total_bytes_returned_ += *out_len;
-      *output_buffer_pos_ += *out_len;
-      *output_buffer_bytes_left_ -= *out_len;
+      AdvanceBufferPos(*out_len, output_buffer_pos_, output_buffer_bytes_left_);
     }
     return true;
   }
@@ -68,24 +67,43 @@ inline bool ScannerContext::Stream::ReadBytes(int64_t length, uint8_t** buf,
   return true;
 }
 
-/// TODO: consider implementing a Skip in the context/stream object that's more
-/// efficient than GetBytes.
 inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
-  uint8_t* dummy_buf;
-  int64_t bytes_read;
-  RETURN_IF_FALSE(GetBytes(length, &dummy_buf, &bytes_read, status));
-  if (UNLIKELY(length != bytes_read)) {
-    DCHECK_LT(bytes_read, length);
-    *status = ReportIncompleteRead(length, bytes_read);
-    return false;
+  int64_t bytes_left = length;
+  // Skip bytes from the boundary buffer first.
+  if (boundary_buffer_bytes_left_ > 0) {
+    DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
+    DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
+    int64_t boundary_buffer_bytes_to_skip =
+        std::min(bytes_left, boundary_buffer_bytes_left_);
+    AdvanceBufferPos(boundary_buffer_bytes_to_skip, &boundary_buffer_pos_,
+        &boundary_buffer_bytes_left_);
+    bytes_left -= boundary_buffer_bytes_to_skip;
+    total_bytes_returned_ += boundary_buffer_bytes_to_skip;
+    if (boundary_buffer_bytes_left_ == 0 && io_buffer_bytes_left_ > 0) {
+      output_buffer_pos_ = &io_buffer_pos_;
+      output_buffer_bytes_left_ = &io_buffer_bytes_left_;
+    }
+    if (bytes_left == 0) return true;
   }
-  return true;
+  // Skip bytes from the I/O buffer second.
+  if (io_buffer_bytes_left_ > 0) {
+    int64_t io_buffer_bytes_to_skip = std::min(bytes_left, io_buffer_bytes_left_);
+    AdvanceBufferPos(io_buffer_bytes_to_skip, &io_buffer_pos_, &io_buffer_bytes_left_);
+    bytes_left -= io_buffer_bytes_to_skip;
+    total_bytes_returned_ += io_buffer_bytes_to_skip;
+    if (bytes_left == 0) return true;
+  }
+  DCHECK(ValidateBufferPointers());
+  DCHECK_GT(bytes_left, 0);
+  // Slow path: need to skip data in subsequent buffers.
+  return SkipBytesInternal(length, bytes_left, status);
 }
 
 inline bool ScannerContext::Stream::SkipText(Status* status) {
-  uint8_t* dummy_buffer;
-  int64_t bytes_read;
-  return ReadText(&dummy_buffer, &bytes_read, status);
+  int64_t len;
+  RETURN_IF_FALSE(ReadVLong(&len, status));
+  RETURN_IF_FALSE(SkipBytes(len, status));
+  return true;
 }
 
 inline bool ScannerContext::Stream::ReadText(uint8_t** buf, int64_t* len,
@@ -166,6 +184,13 @@ inline bool ScannerContext::Stream::ReadZLong(int64_t* value, Status* status) {
   return true;
 }
 
+inline void ScannerContext::Stream::AdvanceBufferPos(int64_t bytes,
+    uint8_t** buffer_pos, int64_t* buffer_bytes_left) {
+  DCHECK_LE(bytes, *buffer_bytes_left);
+  *buffer_pos += bytes;
+  *buffer_bytes_left -= bytes;
+}
+
 #undef RETURN_IF_FALSE
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/fd5c3a7e/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 4854bd6..f3d69f1 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -659,6 +659,7 @@ unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer(
       buffer = free_buffers_[idx].front();
       free_buffers_[idx].pop_front();
       free_buffer_mem_tracker_->Release(buffer_size);
+      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
     }
   }
 
@@ -682,6 +683,7 @@ void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
       uint8_t* buffer = free_buffers->front();
       free_buffers->pop_front();
       int64_t buffer_size = (1LL << idx) * min_buffer_size_;
+      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
       delete[] buffer;
       free_buffer_mem_tracker_->Release(buffer_size);
       num_allocated_buffers_.Add(-1);
@@ -717,6 +719,8 @@ void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
     unique_lock<mutex> lock(free_buffers_lock_);
     if (!FLAGS_disable_mem_pools &&
         free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
+      // Poison buffers stored in cache.
+      ASAN_POISON_MEMORY_REGION(buffer, buffer_size);
       free_buffers_[idx].push_back(buffer);
       if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
         ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);