You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/07/03 00:29:13 UTC

[3/6] incubator-impala git commit: IMPALA-3905: HdfsScanner::GetNext() for Avro, RC, and Seq scans.

IMPALA-3905: HdfsScanner::GetNext() for Avro, RC, and Seq scans.

Implements HdfsScanner::GetNext() for the Avro, RC File, and
Sequence File scanners. Changes ProcessSplit() to repeatedly call
GetNext() to share the core scanning code between the legacy
ProcessSplit() interface (ProcessSplit()) and the new GetNext()
interface.

Summary of changes:
- Slightly change code flow for initial scan range that
  only parses the file header. The new code sets
  'only_parsing_header_' in Open() and then honors
  that flag in GetNextInternal(). Before, all the logic
  was inside ProcessSpit().
- Replace 'finished_' with 'eos_'.
- Add a RowBatch parameter to various functions.
- Change Close() to free all resources when a nullptr
  RowBatch is passed.

Testing:
- Exhaustive tests passed on debug
- Core tests passed on asan
- TODO: Perf testing on cluster

Change-Id: Ie18f57b0d3fe0052a8ccd361b6a5fcdf979d0669
Reviewed-on: http://gerrit.cloudera.org:8080/6527
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/931bf49c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/931bf49c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/931bf49c

Branch: refs/heads/master
Commit: 931bf49cd90e496df6bf260ae668ec6944f0016c
Parents: 5007159
Author: Alex Behm <al...@cloudera.com>
Authored: Thu Mar 9 10:06:54 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Jul 1 21:59:34 2017 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc            | 130 ++++++-------
 be/src/exec/base-sequence-scanner.h             |  96 +++++-----
 be/src/exec/exec-node.cc                        |  30 ++-
 be/src/exec/exec-node.h                         |  16 +-
 be/src/exec/hdfs-avro-scanner-ir.cc             |   2 +-
 be/src/exec/hdfs-avro-scanner.cc                | 190 +++++++++----------
 be/src/exec/hdfs-avro-scanner.h                 |  81 ++++----
 be/src/exec/hdfs-parquet-scanner.cc             |  26 +--
 be/src/exec/hdfs-parquet-scanner.h              |  44 +++--
 be/src/exec/hdfs-rcfile-scanner.cc              | 185 +++++++++---------
 be/src/exec/hdfs-rcfile-scanner.h               |  53 +++---
 be/src/exec/hdfs-scan-node-base.cc              |   7 +-
 be/src/exec/hdfs-scan-node-base.h               |  11 +-
 be/src/exec/hdfs-scan-node-mt.h                 |   7 +-
 be/src/exec/hdfs-scan-node.cc                   |  20 +-
 be/src/exec/hdfs-scan-node.h                    |  23 ++-
 be/src/exec/hdfs-scanner.cc                     |  93 +++------
 be/src/exec/hdfs-scanner.h                      | 150 +++++----------
 be/src/exec/hdfs-sequence-scanner.cc            | 138 +++++++-------
 be/src/exec/hdfs-sequence-scanner.h             | 139 +++++++-------
 be/src/exec/hdfs-text-scanner.cc                |  29 +--
 be/src/exec/hdfs-text-scanner.h                 |  32 ++--
 be/src/exec/kudu-scan-node.cc                   |  15 +-
 be/src/exec/scan-node.h                         |   2 +-
 be/src/util/blocking-queue.h                    |   3 +-
 .../queries/DataErrorsTest/avro-errors.test     |   3 +-
 26 files changed, 705 insertions(+), 820 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index ca9b0c9..25fed0b 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
 #include <boost/bind.hpp>
 
 #include "exec/base-sequence-scanner.h"
 
+#include "exec/hdfs-scan-node-base.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/scanner-context.inline.h"
 #include "runtime/runtime-state.h"
@@ -60,28 +62,17 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         DiskIoMgr::BufferOpts::Uncached());
     header_ranges.push_back(header_range);
   }
-  // Issue the header ranges only.  ProcessSplit() will issue the files' scan ranges
+  // Issue the header ranges only. GetNextInternal() will issue the files' scan ranges
   // and those ranges will need scanner threads, so no files are marked completed yet.
   RETURN_IF_ERROR(scan_node->AddDiskIoRanges(header_ranges, 0));
   return Status::OK();
 }
 
 BaseSequenceScanner::BaseSequenceScanner(HdfsScanNodeBase* node, RuntimeState* state)
-  : HdfsScanner(node, state),
-    header_(NULL),
-    only_parsing_header_(false),
-    block_start_(0),
-    total_block_size_(0),
-    num_syncs_(0) {
+  : HdfsScanner(node, state) {
 }
 
-BaseSequenceScanner::BaseSequenceScanner()
-  : HdfsScanner(),
-    header_(NULL),
-    only_parsing_header_(false),
-    block_start_(0),
-    total_block_size_(0),
-    num_syncs_(0) {
+BaseSequenceScanner::BaseSequenceScanner() : HdfsScanner() {
   DCHECK(TestInfo::is_test());
 }
 
@@ -93,8 +84,26 @@ Status BaseSequenceScanner::Open(ScannerContext* context) {
   stream_->set_read_past_size_cb(bind(&BaseSequenceScanner::ReadPastSize, this, _1));
   bytes_skipped_counter_ = ADD_COUNTER(
       scan_node_->runtime_profile(), "BytesSkipped", TUnit::BYTES);
-  // Allocate a new row batch. May fail if mem limit is exceeded.
-  RETURN_IF_ERROR(StartNewRowBatch());
+
+  header_ = reinterpret_cast<FileHeader*>(
+      scan_node_->GetFileMetadata(stream_->filename()));
+  if (header_ == nullptr) {
+    only_parsing_header_ = true;
+    return Status::OK();
+  }
+
+  // If the file is compressed, the buffers in the stream_ are not used directly.
+  if (header_->is_compressed) stream_->set_contains_tuple_data(false);
+  RETURN_IF_ERROR(InitNewRange());
+
+  // Skip to the first record
+  if (stream_->file_offset() < header_->header_size) {
+    // If the scan range starts within the header, skip to the end of the header so we
+    // don't accidentally skip to an extra sync within the header
+    RETURN_IF_FALSE(stream_->SkipBytes(
+        header_->header_size - stream_->file_offset(), &parse_status_));
+  }
+  RETURN_IF_ERROR(SkipToSync(header_->sync, SYNC_HASH_SIZE));
   return Status::OK();
 }
 
@@ -105,43 +114,39 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) {
             << (num_syncs_ > 1 ? total_block_size_ / (num_syncs_ - 1) : 0);
   // Need to close the decompressor before releasing the resources at AddFinalRowBatch(),
   // because in some cases there is memory allocated in decompressor_'s temp_memory_pool_.
-  if (decompressor_.get() != NULL) {
+  if (decompressor_.get() != nullptr) {
     decompressor_->Close();
-    decompressor_.reset(NULL);
+    decompressor_.reset();
   }
-  if (row_batch != NULL) {
+  if (row_batch != nullptr) {
     row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
+    row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
     context_->ReleaseCompletedResources(row_batch, true);
     if (scan_node_->HasRowBatchQueue()) {
-      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+        unique_ptr<RowBatch>(row_batch));
     }
-  }
-  // Transfer template tuple pool to scan node pool. The scanner may be closed and
-  // subsequently re-used for another range, so we need to ensure that the template
-  // tuples are backed by live memory.
-  if (template_tuple_pool_.get() != NULL) {
-    static_cast<HdfsScanNode*>(scan_node_)->TransferToScanNodePool(
-        template_tuple_pool_.get());
+  } else {
+    data_buffer_pool_->FreeAll();
+    template_tuple_pool_->FreeAll();
+    context_->ReleaseCompletedResources(nullptr, true);
   }
 
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
-  // 'header_' can be NULL if HdfsScanNodeBase::CreateAndOpenScanner() failed.
-  if (!only_parsing_header_ && header_ != NULL) {
+  // 'header_' can be nullptr if HdfsScanNodeBase::CreateAndOpenScanner() failed.
+  if (!only_parsing_header_ && header_ != nullptr) {
     scan_node_->RangeComplete(file_format(), header_->compression_type);
   }
-  HdfsScanner::Close(row_batch);
+  CloseInternal();
 }
 
-Status BaseSequenceScanner::ProcessSplit() {
-  DCHECK(scan_node_->HasRowBatchQueue());
-  header_ = reinterpret_cast<FileHeader*>(
-      static_cast<HdfsScanNode*>(scan_node_)->GetFileMetadata(stream_->filename()));
-  if (header_ == NULL) {
-    // This is the initial scan range just to parse the header
-    only_parsing_header_ = true;
+Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
+  if (only_parsing_header_) {
+    DCHECK(header_ == nullptr);
+    eos_ = true;
     header_ = state_->obj_pool()->Add(AllocateFileHeader());
     Status status = ReadFileHeader();
     if (!status.ok()) {
@@ -150,36 +155,23 @@ Status BaseSequenceScanner::ProcessSplit() {
       CloseFileRanges(stream_->filename());
       return Status::OK();
     }
-
-    // Header is parsed, set the metadata in the scan node and issue more ranges
-    static_cast<HdfsScanNode*>(scan_node_)->SetFileMetadata(
+    // Header is parsed, set the metadata in the scan node and issue more ranges.
+    static_cast<HdfsScanNodeBase*>(scan_node_)->SetFileMetadata(
         stream_->filename(), header_);
     HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename());
     RETURN_IF_ERROR(scan_node_->AddDiskIoRanges(desc));
     return Status::OK();
   }
+  if (eos_) return Status::OK();
 
-  // Initialize state for new scan range
-  finished_ = false;
-  // If the file is compressed, the buffers in the stream_ are not used directly.
-  if (header_->is_compressed) stream_->set_contains_tuple_data(false);
-  RETURN_IF_ERROR(InitNewRange());
-
-  Status status = Status::OK();
-
-  // Skip to the first record
-  if (stream_->file_offset() < header_->header_size) {
-    // If the scan range starts within the header, skip to the end of the header so we
-    // don't accidentally skip to an extra sync within the header
-    RETURN_IF_FALSE(stream_->SkipBytes(
-        header_->header_size - stream_->file_offset(), &parse_status_));
-  }
-  RETURN_IF_ERROR(SkipToSync(header_->sync, SYNC_HASH_SIZE));
+  int64_t tuple_buffer_size;
+  RETURN_IF_ERROR(
+      row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
+  tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
+  DCHECK_GT(row_batch->capacity(), 0);
 
-  // Process Range.
-  while (!finished_) {
-    status = ProcessRange();
-    if (status.ok()) break;
+  Status status = ProcessRange(row_batch);
+  if (!status.ok()) {
     if (status.IsCancelled() || status.IsMemLimitExceeded()) return status;
 
     // Log error from file format parsing.
@@ -201,19 +193,16 @@ Status BaseSequenceScanner::ProcessSplit() {
     RETURN_IF_ERROR(status);
     DCHECK(parse_status_.ok());
   }
-
-  // All done with this scan range.
   return Status::OK();
 }
 
 Status BaseSequenceScanner::ReadSync() {
-  // We are finished when we read a sync marker occurring completely in the next
-  // scan range
-  finished_ = stream_->eosr();
-
   uint8_t* hash;
   int64_t out_len;
-  RETURN_IF_FALSE(stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_));
+  bool success = stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_);
+  // We are done when we read a sync marker occurring completely in the next scan range.
+  eos_ = stream_->eosr() || stream_->eof();
+  if (!success) return parse_status_;
   if (out_len != SYNC_HASH_SIZE) {
     return Status(Substitute("Hit end of stream after reading $0 bytes of $1-byte "
         "synchronization marker", out_len, SYNC_HASH_SIZE));
@@ -226,7 +215,6 @@ Status BaseSequenceScanner::ReadSync() {
         << ReadWriteUtil::HexDump(hash, SYNC_HASH_SIZE) << "'";
     return Status(ss.str());
   }
-  finished_ |= stream_->eof();
   total_block_size_ += stream_->file_offset() - block_start_;
   block_start_ = stream_->file_offset();
   ++num_syncs_;
@@ -293,21 +281,21 @@ Status BaseSequenceScanner::SkipToSync(const uint8_t* sync, int sync_size) {
   if (offset == -1) {
     // No more syncs in this scan range
     DCHECK(stream_->eosr());
-    finished_ = true;
+    eos_ = true;
     return Status::OK();
   }
   DCHECK_GE(offset, sync_size);
 
   // Make sure sync starts in our scan range
   if (offset - sync_size >= stream_->bytes_left()) {
-    finished_ = true;
+    eos_ = true;
     return Status::OK();
   }
 
   RETURN_IF_FALSE(stream_->SkipBytes(offset, &parse_status_));
   VLOG_FILE << "Found sync for: " << stream_->filename()
             << " at " << stream_->file_offset() - sync_size;
-  if (stream_->eof()) finished_ = true;
+  if (stream_->eof()) eos_ = true;
   block_start_ = stream_->file_offset();
   ++num_syncs_;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h
index a909e00..785182e 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -30,20 +30,30 @@ namespace impala {
 struct HdfsFileDesc;
 class ScannerContext;
 
-/// Superclass for all sequence container based file formats:
-/// e.g. SequenceFile, RCFile, Avro
-/// Sequence container formats have sync markers periodically in the file.
-/// This class is will skip to the start of sync markers for errors and
-/// hdfs splits.
+/// Superclass for all sequence container based file formats: SequenceFile, RCFile, Avro.
+/// Sequence container formats have sync markers periodically in the file. This scanner
+/// recovers from corrupt or otherwise non-parsable data blocks by skipping to the next
+/// sync marker in the range.
+///
+/// Handling of sync markers:
+/// A scanner is responsible for the data with the first complete sync in its scan range
+/// through the first complete sync in the next range. 'eos_' is set to true when this
+/// scanner has processed all the bytes it is responsible for, i.e., when it reads a sync
+/// occurring completely in the next scan range, as this is the first sync that the next
+/// scan range will be able to locate. Note that checking context_->eosr() after reading
+/// each sync is insufficient for determining 'eos_'. If a sync marker spans two scan
+/// ranges, the first scan range must process the following block since the second scan
+/// range cannot find the incomplete sync. context_->eosr() will not alert us to this
+/// situation, causing the block to be incorrectly skipped.
 class BaseSequenceScanner : public HdfsScanner {
  public:
   /// Issue the initial ranges for all sequence container files.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
-                                   const std::vector<HdfsFileDesc*>& files);
+                                   const std::vector<HdfsFileDesc*>& files)
+                                   WARN_UNUSED_RESULT;
 
-  virtual Status Open(ScannerContext* context);
+  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
   virtual void Close(RowBatch* row_batch);
-  virtual Status ProcessSplit();
 
   virtual ~BaseSequenceScanner();
 
@@ -73,14 +83,14 @@ class BaseSequenceScanner : public HdfsScanner {
     int64_t header_size;
   };
 
+  virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
+
   /// Subclasses must implement these functions.  The order for calls will be
   ///  1. AllocateFileHeader() - called once per file
   ///  2. ReadFileHeader() - called once per file
-  ///  3. InitNewRange()
-  ///  4. ProcessRange()
-  /// In the normal case, 3 and 4 are called for each scan range once.  In the
-  /// case of errors and skipped bytes, 4 is repeatedly called, each time
-  /// starting right after the sync marker.
+  ///  3. InitNewRange() - called once per scan range
+  ///  4. ProcessRange()* - called until eos, bytes may be skipped between calls to
+  ///                       recover from parse errors
 
   /// Allocate a file header object for this scanner.  If the scanner needs
   /// additional header information, it should subclass FileHeader.
@@ -90,34 +100,31 @@ class BaseSequenceScanner : public HdfsScanner {
   /// Read the file header.  The underlying ScannerContext is at the start of
   /// the file header.  This function must read the file header (which advances
   /// context_ past it) and initialize header_.
-  virtual Status ReadFileHeader() = 0;
+  virtual Status ReadFileHeader() WARN_UNUSED_RESULT = 0;
 
-  /// Process the current range until the end or an error occurred.  Note this might
-  /// be called multiple times if we skip over bad data.
-  /// This function should read from the underlying ScannerContext materializing
-  /// tuples to the context.  When this function is called, it is guaranteed to be
-  /// at the start of a data block (i.e. right after the sync marker).
-  virtual Status ProcessRange() = 0;
+  /// Materializes tuples into 'row_batch' by reading from the underlying ScannerContext.
+  /// Assumes that the 'stream_' is positioned in the data portion of the range, i.e.,
+  /// not at a sync marker or other metadata of the range. May set 'eos_'.
+  virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT = 0;
 
   /// Returns type of scanner: e.g. rcfile, seqfile
   virtual THdfsFileFormat::type file_format() const = 0;
 
   BaseSequenceScanner(HdfsScanNodeBase*, RuntimeState*);
 
-  /// Read and validate sync marker against header_->sync.  Returns non-ok if the sync
-  /// marker did not match. Scanners should always use this function to read sync markers,
-  /// otherwise finished() might not be updated correctly. If finished() returns true after
-  /// calling this function, scanners must not process any more records.
-  Status ReadSync();
+  /// Read sync marker from 'stream_' and validate against 'header_->sync'. Returns
+  /// non-ok if the sync marker did not match. Scanners should always use this function
+  /// to read sync markers, otherwise eos() might not be updated correctly. If eos()
+  /// returns true after calling this function, scanners must not process any more
+  /// records.
+  Status ReadSync() WARN_UNUSED_RESULT;
 
-  /// Utility function to advance past the next sync marker, reading bytes from stream_.
-  /// If no sync is found in the scan range, return Status::OK and sets finished_ to
-  /// true. It is safe to call this function past eosr.
+  /// Utility function to advance 'stream_' past the next sync marker. If no sync is
+  /// found in the scan range, returns OK and sets 'eos_' to true. It is safe to call
+  /// this function past eosr.
   /// - sync: sync marker to search for (does not include 0xFFFFFFFF prefix)
   /// - sync_size: number of bytes for sync
-  Status SkipToSync(const uint8_t* sync, int sync_size);
-
-  bool finished() { return finished_; }
+  Status SkipToSync(const uint8_t* sync, int sync_size) WARN_UNUSED_RESULT;
 
   /// Estimate of header size in bytes.  This is initial number of bytes to issue
   /// per file.  If the estimate is too low, more bytes will be read as necessary.
@@ -127,40 +134,25 @@ class BaseSequenceScanner : public HdfsScanner {
   const static int SYNC_MARKER;
 
   /// File header for this scan range.  This is not owned by the parent scan node.
-  FileHeader* header_;
+  FileHeader* header_ = nullptr;
 
   /// If true, this scanner object is only for processing the header.
-  bool only_parsing_header_;
+  bool only_parsing_header_ = false;
 
   /// Unit test constructor
   BaseSequenceScanner();
 
  private:
-  /// Set to true when this scanner has processed all the bytes it is responsible
-  /// for, i.e., when it reads a sync occurring completely in the next scan
-  /// range, as this is the first sync that the next scan range will be able to
-  /// locate. (Each scan range is responsible for the first complete sync in the
-  /// range through the first complete sync in the next range.)
-  //
-  /// We need this variable because checking context_->eosr() after reading each
-  /// sync is insufficient.  If a sync marker spans two scan ranges, the first
-  /// scan range must process the following block since the second scan range
-  /// cannot find the incomplete sync. context_->eosr() will not alert us to this
-  /// situation, causing the block to be skipped.
-  //
-  /// finished_ is set by ReadSync() and SkipToSync().
-  bool finished_;
-
   /// Byte offset from the start of the file for the current block. Note that block refers
   /// to all the data between two syncs.
-  int64_t block_start_;
+  int64_t block_start_ = 0;
 
   /// The total number of bytes in all blocks this scan range has processed (updated in
   /// SkipToSync(), so only includes blocks that were completely processed).
-  int total_block_size_;
+  int total_block_size_ = 0;
 
   /// The number of syncs seen by this scanner so far.
-  int num_syncs_;
+  int num_syncs_ = 0;
 
   /// Callback for stream_ to compute how much to read past the scan range. Returns the
   /// average number of bytes per block minus how far 'file_offset' is into the current
@@ -177,7 +169,7 @@ class BaseSequenceScanner : public HdfsScanner {
   void CloseFileRanges(const char* file);
 
   /// Number of bytes skipped when advancing to next sync on error.
-  RuntimeProfile::Counter* bytes_skipped_counter_;
+  RuntimeProfile::Counter* bytes_skipped_counter_ = nullptr;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 1505f71..5618fef 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -17,6 +17,7 @@
 
 #include "exec/exec-node.h"
 
+#include <memory>
 #include <sstream>
 #include <unistd.h>  // for sleep()
 
@@ -75,45 +76,38 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) {
 }
 
 ExecNode::RowBatchQueue::RowBatchQueue(int max_batches)
-  : BlockingQueue<RowBatch*>(max_batches) {
+  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {
 }
 
 ExecNode::RowBatchQueue::~RowBatchQueue() {
   DCHECK(cleanup_queue_.empty());
 }
 
-void ExecNode::RowBatchQueue::AddBatch(RowBatch* batch) {
-  if (!BlockingPut(batch)) {
+void ExecNode::RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (!BlockingPut(move(batch))) {
     lock_guard<SpinLock> l(lock_);
-    cleanup_queue_.push_back(batch);
+    cleanup_queue_.push_back(move(batch));
   }
 }
 
-bool ExecNode::RowBatchQueue::AddBatchWithTimeout(RowBatch* batch,
-    int64_t timeout_micros) {
-  return BlockingPutWithTimeout(batch, timeout_micros);
-}
-
-RowBatch* ExecNode::RowBatchQueue::GetBatch() {
-  RowBatch* result = NULL;
+unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() {
+  unique_ptr<RowBatch> result;
   if (BlockingGet(&result)) return result;
-  return NULL;
+  return unique_ptr<RowBatch>();
 }
 
 int ExecNode::RowBatchQueue::Cleanup() {
   int num_io_buffers = 0;
 
-  RowBatch* batch = NULL;
+  unique_ptr<RowBatch> batch = NULL;
   while ((batch = GetBatch()) != NULL) {
     num_io_buffers += batch->num_io_buffers();
-    delete batch;
+    batch.reset();
   }
 
   lock_guard<SpinLock> l(lock_);
-  for (list<RowBatch*>::iterator it = cleanup_queue_.begin();
-      it != cleanup_queue_.end(); ++it) {
-    num_io_buffers += (*it)->num_io_buffers();
-    delete *it;
+  for (const unique_ptr<RowBatch>& row_batch: cleanup_queue_) {
+    num_io_buffers += row_batch->num_io_buffers();
   }
   cleanup_queue_.clear();
   return num_io_buffers;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index d64872b..b6dcd6e 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -19,6 +19,7 @@
 #ifndef IMPALA_EXEC_EXEC_NODE_H
 #define IMPALA_EXEC_EXEC_NODE_H
 
+#include <memory>
 #include <sstream>
 #include <vector>
 
@@ -225,7 +226,7 @@ class ExecNode {
   /// Row batches that are added after Shutdown() are queued in another queue, which can
   /// be cleaned up during Close().
   /// All functions are thread safe.
-  class RowBatchQueue : public BlockingQueue<RowBatch*> {
+  class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
    public:
     /// max_batches is the maximum number of row batches that can be queued.
     /// When the queue is full, producers will block.
@@ -233,19 +234,12 @@ class ExecNode {
     ~RowBatchQueue();
 
     /// Adds a batch to the queue. This is blocking if the queue is full.
-    void AddBatch(RowBatch* batch);
-
-    /// Adds a batch to the queue. If the queue is full, this blocks until space becomes
-    /// available or 'timeout_micros' has elapsed.
-    /// Returns true if the element was added to the queue, false if it wasn't. If this
-    /// method returns false, the queue didn't take ownership of the batch and it must be
-    /// managed externally.
-    bool AddBatchWithTimeout(RowBatch* batch, int64_t timeout_micros) WARN_UNUSED_RESULT;
+    void AddBatch(std::unique_ptr<RowBatch> batch);
 
     /// Gets a row batch from the queue. Returns NULL if there are no more.
     /// This function blocks.
     /// Returns NULL after Shutdown().
-    RowBatch* GetBatch();
+    std::unique_ptr<RowBatch> GetBatch();
 
     /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
     /// after this is called.
@@ -257,7 +251,7 @@ class ExecNode {
     SpinLock lock_;
 
     /// Queue of orphaned row batches
-    std::list<RowBatch*> cleanup_queue_;
+    std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
   };
 
   /// Unique within a single plan tree.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-avro-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index 2404d81..3c254cc 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -213,7 +213,7 @@ bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da
     if (ctype.IsVarLenStringType()) {
       StringValue* sv = reinterpret_cast<StringValue*>(slot);
       sv->ptr = reinterpret_cast<char*>(pool->TryAllocate(max_len));
-      if (UNLIKELY(sv->ptr == NULL)) {
+      if (UNLIKELY(sv->ptr == nullptr)) {
         string details = Substitute("HdfsAvroScanner::ReadAvroChar() failed to allocate"
             "$0 bytes for char slot.", max_len);
         parse_status_ = pool->mem_tracker()->MemLimitExceeded(state_, details, max_len);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 0df7c46..6771e63 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -56,21 +56,17 @@ const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
 HdfsAvroScanner::HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
-  : BaseSequenceScanner(scan_node, state),
-    avro_header_(NULL),
-    codegend_decode_avro_data_(NULL) {
+  : BaseSequenceScanner(scan_node, state) {
 }
 
 HdfsAvroScanner::HdfsAvroScanner()
-  : BaseSequenceScanner(),
-    avro_header_(NULL),
-    codegend_decode_avro_data_(NULL) {
+  : BaseSequenceScanner() {
   DCHECK(TestInfo::is_test());
 }
 
 Status HdfsAvroScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(BaseSequenceScanner::Open(context));
-  if (scan_node_->avro_schema().schema == NULL) {
+  if (scan_node_->avro_schema().schema == nullptr) {
     return Status("Missing Avro schema in scan node. This could be due to stale "
         "metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
   }
@@ -79,16 +75,16 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
 
 Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ScalarExpr*>& conjuncts, Function** decode_avro_data_fn) {
-  *decode_avro_data_fn = NULL;
+  *decode_avro_data_fn = nullptr;
   DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != NULL);
-  Function* materialize_tuple_fn = NULL;
+  DCHECK(codegen != nullptr);
+  Function* materialize_tuple_fn = nullptr;
   RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn));
-  DCHECK(materialize_tuple_fn != NULL);
+  DCHECK(materialize_tuple_fn != nullptr);
   RETURN_IF_ERROR(CodegenDecodeAvroData(codegen, materialize_tuple_fn, conjuncts,
       decode_avro_data_fn));
-  DCHECK(*decode_avro_data_fn != NULL);
+  DCHECK(*decode_avro_data_fn != nullptr);
   return Status::OK();
 }
 
@@ -99,6 +95,7 @@ BaseSequenceScanner::FileHeader* HdfsAvroScanner::AllocateFileHeader() {
 }
 
 Status HdfsAvroScanner::ReadFileHeader() {
+  DCHECK(only_parsing_header_);
   avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
 
   // Check version header
@@ -119,6 +116,10 @@ Status HdfsAvroScanner::ReadFileHeader() {
   memcpy(header_->sync, sync, SYNC_HASH_SIZE);
 
   header_->header_size = stream_->total_bytes_returned() - SYNC_HASH_SIZE;
+
+  // Transfer ownership so the memory remains valid for subsequent scanners that process
+  // the data portions of the file.
+  scan_node_->TransferToScanNodePool(template_tuple_pool_.get());
   return Status::OK();
 }
 
@@ -252,7 +253,7 @@ Status HdfsAvroScanner::ResolveSchemas(const AvroSchemaElement& table_root,
         // This field doesn't exist in the file schema. Check if there is a default value.
         avro_datum_t default_value =
             avro_schema_record_field_default(table_record->schema, table_field_idx);
-        if (default_value == NULL) {
+        if (default_value == nullptr) {
           return Status(TErrorCode::AVRO_MISSING_DEFAULT, field_name);
         }
         RETURN_IF_ERROR(WriteDefaultValue(slot_desc, default_value, field_name));
@@ -287,8 +288,8 @@ Status HdfsAvroScanner::ResolveSchemas(const AvroSchemaElement& table_root,
 
 Status HdfsAvroScanner::WriteDefaultValue(
     SlotDescriptor* slot_desc, avro_datum_t default_value, const char* field_name) {
-  if (avro_header_->template_tuple == NULL) {
-    if (template_tuple_ != NULL) {
+  if (avro_header_->template_tuple == nullptr) {
+    if (template_tuple_ != nullptr) {
       avro_header_->template_tuple = template_tuple_;
     } else {
       avro_header_->template_tuple =
@@ -303,35 +304,35 @@ Status HdfsAvroScanner::WriteDefaultValue(
       RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
       int8_t v;
       if (avro_boolean_get(default_value, &v)) DCHECK(false);
-      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
+      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
       break;
     }
     case AVRO_INT32: {
       RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
       int32_t v;
       if (avro_int32_get(default_value, &v)) DCHECK(false);
-      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
+      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
       break;
     }
     case AVRO_INT64: {
       RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
       int64_t v;
       if (avro_int64_get(default_value, &v)) DCHECK(false);
-      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
+      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
       break;
     }
     case AVRO_FLOAT: {
       RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
       float v;
       if (avro_float_get(default_value, &v)) DCHECK(false);
-      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
+      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
       break;
     }
     case AVRO_DOUBLE: {
       RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
       double v;
       if (avro_double_get(default_value, &v)) DCHECK(false);
-      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
+      RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
       break;
     }
     case AVRO_STRING:
@@ -467,7 +468,7 @@ bool HdfsAvroScanner::VerifyTypesMatch(
 }
 
 Status HdfsAvroScanner::InitNewRange() {
-  DCHECK(header_ != NULL);
+  DCHECK(header_ != nullptr);
   only_parsing_header_ = false;
   avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
   template_tuple_ = avro_header_->template_tuple;
@@ -479,7 +480,7 @@ Status HdfsAvroScanner::InitNewRange() {
     codegend_decode_avro_data_ = reinterpret_cast<DecodeAvroDataFn>(
         scan_node_->GetCodegenFn(THdfsFileFormat::AVRO));
   }
-  if (codegend_decode_avro_data_ == NULL) {
+  if (codegend_decode_avro_data_ == nullptr) {
     scan_node_->IncNumScannersCodegenDisabled();
   } else {
     VLOG(2) << "HdfsAvroScanner (node_id=" << scan_node_->id()
@@ -490,28 +491,21 @@ Status HdfsAvroScanner::InitNewRange() {
   return Status::OK();
 }
 
-Status HdfsAvroScanner::ProcessRange() {
-  while (!finished()) {
-    int64_t num_records;
-    uint8_t* compressed_data;
-    int64_t compressed_size;
-    uint8_t* data;
-    int64_t data_len;
-    uint8_t* data_end;
-
+Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
+  if (record_pos_ == num_records_in_block_) {
     // Read new data block
-    RETURN_IF_FALSE(
-        stream_->ReadZLong(&num_records, &parse_status_));
-    if (num_records < 0) {
+    RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block_, &parse_status_));
+    if (num_records_in_block_ < 0) {
       return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(),
-          num_records, stream_->file_offset());
+          num_records_in_block_, stream_->file_offset());
     }
-    DCHECK_GE(num_records, 0);
+    int64_t compressed_size;
     RETURN_IF_FALSE(stream_->ReadZLong(&compressed_size, &parse_status_));
     if (compressed_size < 0) {
       return Status(TErrorCode::AVRO_INVALID_COMPRESSED_SIZE, stream_->filename(),
           compressed_size, stream_->file_offset());
     }
+    uint8_t* compressed_data;
     RETURN_IF_FALSE(stream_->ReadBytes(
         compressed_size, &compressed_data, &parse_status_));
 
@@ -522,47 +516,45 @@ Status HdfsAvroScanner::ProcessRange() {
         compressed_size -= SnappyDecompressor::TRAILING_CHECKSUM_LEN;
       }
       SCOPED_TIMER(decompress_timer_);
-      RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size, compressed_data,
-          &data_len, &data));
-      VLOG_FILE << "Decompressed " << compressed_size << " to " << data_len;
+      RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size,
+          compressed_data, &data_block_len_, &data_block_));
     } else {
-      data = compressed_data;
-      data_len = compressed_size;
+      data_block_ = compressed_data;
+      data_block_len_ = compressed_size;
     }
-    data_end = data + data_len;
-
-    // Process block data
-    while (num_records > 0) {
-      SCOPED_TIMER(scan_node_->materialize_tuple_timer());
-
-      MemPool* pool;
-      Tuple* tuple;
-      TupleRow* tuple_row;
-      int max_tuples = GetMemory(&pool, &tuple, &tuple_row);
-      max_tuples = min<int64_t>(num_records, max_tuples);
-      int num_to_commit;
-      if (scan_node_->materialized_slots().empty()) {
-        // No slots to materialize (e.g. count(*)), no need to decode data
-        num_to_commit = WriteTemplateTuples(tuple_row, max_tuples);
-      } else {
-        if (codegend_decode_avro_data_ != NULL) {
-          num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, &data,
-              data_end, tuple, tuple_row);
-        } else {
-          num_to_commit = DecodeAvroData(max_tuples, pool, &data, data_end, tuple,
-              tuple_row);
-        }
-      }
-      RETURN_IF_ERROR(parse_status_);
-      RETURN_IF_ERROR(CommitRows(num_to_commit));
-      num_records -= max_tuples;
-      COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
+    data_block_end_ = data_block_ + data_block_len_;
+    record_pos_ = 0;
+  }
 
-      if (scan_node_->ReachedLimit()) return Status::OK();
+  // Process block data
+  while (record_pos_ != num_records_in_block_) {
+    SCOPED_TIMER(scan_node_->materialize_tuple_timer());
+
+    Tuple* tuple = tuple_;
+    TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow());
+    int max_tuples = row_batch->capacity() - row_batch->num_rows();
+    max_tuples = min<int64_t>(max_tuples, num_records_in_block_ - record_pos_);
+    int num_to_commit;
+    if (scan_node_->materialized_slots().empty()) {
+      // No slots to materialize (e.g. count(*)), no need to decode data
+      num_to_commit = WriteTemplateTuples(tuple_row, max_tuples);
+    } else if (codegend_decode_avro_data_ != nullptr) {
+      num_to_commit = codegend_decode_avro_data_(this, max_tuples,
+          row_batch->tuple_data_pool(), &data_block_, data_block_end_, tuple, tuple_row);
+    } else {
+      num_to_commit = DecodeAvroData(max_tuples, row_batch->tuple_data_pool(),
+          &data_block_, data_block_end_, tuple, tuple_row);
     }
+    RETURN_IF_ERROR(parse_status_);
+    RETURN_IF_ERROR(CommitRows(num_to_commit, row_batch));
+    record_pos_ += max_tuples;
+    COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+  }
 
-    if (decompressor_.get() != NULL && !decompressor_->reuse_output_buffer()) {
-      RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
+  if (record_pos_ == num_records_in_block_) {
+    if (decompressor_.get() != nullptr && !decompressor_->reuse_output_buffer()) {
+      row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
     }
     RETURN_IF_ERROR(ReadSync());
   }
@@ -578,9 +570,9 @@ bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
 
     const SlotDescriptor* slot_desc = element.slot_desc;
     bool write_slot = false;
-    void* slot = NULL;
+    void* slot = nullptr;
     PrimitiveType slot_type = INVALID_TYPE;
-    if (slot_desc != NULL) {
+    if (slot_desc != nullptr) {
       write_slot = true;
       slot = tuple->GetSlot(slot_desc->tuple_offset());
       slot_type = slot_desc->type().type;
@@ -598,7 +590,7 @@ bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
     bool success;
     switch (type) {
       case AVRO_NULL:
-        if (slot_desc != NULL) tuple->SetNull(slot_desc->null_indicator_offset());
+        if (slot_desc != nullptr) tuple->SetNull(slot_desc->null_indicator_offset());
         success = true;
         break;
       case AVRO_BOOLEAN:
@@ -618,10 +610,10 @@ bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
         break;
       case AVRO_STRING:
       case AVRO_BYTES:
-        if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
+        if (slot_desc != nullptr && slot_desc->type().type == TYPE_VARCHAR) {
           success = ReadAvroVarchar(slot_type, slot_desc->type().len, data, data_end,
               write_slot, slot, pool);
-        } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
+        } else if (slot_desc != nullptr && slot_desc->type().type == TYPE_CHAR) {
           success = ReadAvroChar(slot_type, slot_desc->type().len, data, data_end,
               write_slot, slot, pool);
         } else {
@@ -630,7 +622,7 @@ bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
         break;
       case AVRO_DECIMAL: {
         int slot_byte_size = 0;
-        if (slot_desc != NULL) {
+        if (slot_desc != nullptr) {
           DCHECK_EQ(slot_type, TYPE_DECIMAL);
           slot_byte_size = slot_desc->type().GetByteSize();
         }
@@ -782,12 +774,12 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(
   LlvmBuilder builder(context);
 
   Type* this_type = codegen->GetType(HdfsAvroScanner::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
+  DCHECK(this_type != nullptr);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
 
   TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
   StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
-  if (tuple_type == NULL) return Status("Could not generate tuple struct.");
+  if (tuple_type == nullptr) return Status("Could not generate tuple struct.");
   Type* tuple_ptr_type = PointerType::get(tuple_type, 0);
 
   Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
@@ -817,7 +809,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(
   Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr");
 
   // Create a bail out block to handle decoding failures.
-  BasicBlock* bail_out_block = BasicBlock::Create(context, "bail_out", fn, NULL);
+  BasicBlock* bail_out_block = BasicBlock::Create(context, "bail_out", fn, nullptr);
 
   Status status = CodegenReadRecord(
       SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, bail_out_block,
@@ -836,7 +828,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(
   builder.CreateRet(codegen->false_value());
 
   *materialize_tuple_fn = codegen->FinalizeFunction(fn);
-  if (*materialize_tuple_fn == NULL) {
+  if (*materialize_tuple_fn == nullptr) {
     return Status("Failed to finalize materialize_tuple_fn.");
   }
   return Status::OK();
@@ -847,7 +839,7 @@ Status HdfsAvroScanner::CodegenReadRecord(
     LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before,
     BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
     Value* data_val, Value* data_end_val) {
-  if (record.schema == NULL) {
+  if (record.schema == nullptr) {
     return Status("Missing Avro schema in scan node. This could be due to stale "
         "metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
   }
@@ -859,7 +851,7 @@ Status HdfsAvroScanner::CodegenReadRecord(
   // result.
 
   // Used to store result of ReadUnionType() call
-  Value* is_null_ptr = NULL;
+  Value* is_null_ptr = nullptr;
   for (int i = 0; i < record.children.size(); ++i) {
     const AvroSchemaElement* field = &record.children[i];
     int col_idx = i;
@@ -869,16 +861,16 @@ Status HdfsAvroScanner::CodegenReadRecord(
     SchemaPath new_path = path;
     new_path.push_back(col_idx);
     int slot_idx = node->GetMaterializedSlotIdx(new_path);
-    SlotDescriptor* slot_desc = (slot_idx == HdfsScanNode::SKIP_COLUMN) ?
-                                NULL : node->materialized_slots()[slot_idx];
+    SlotDescriptor* slot_desc = (slot_idx == HdfsScanNodeBase::SKIP_COLUMN) ?
+                                nullptr : node->materialized_slots()[slot_idx];
 
     // Block that calls appropriate Read<Type> function
     BasicBlock* read_field_block =
         BasicBlock::Create(context, "read_field", fn, insert_before);
 
-    // Block that handles a NULL value. We fill this in below if the field is nullable,
-    // otherwise we leave this block NULL.
-    BasicBlock* null_block = NULL;
+    // Block that handles a nullptr value. We fill this in below if the field is nullable,
+    // otherwise we leave this block nullptr.
+    BasicBlock* null_block = nullptr;
 
     // This is where we should end up after we're finished processing this field. Used to
     // put the builder in the right place for the next field.
@@ -890,7 +882,7 @@ Status HdfsAvroScanner::CodegenReadRecord(
       Function* read_union_fn = codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
       Value* null_union_pos_val =
           codegen->GetIntConstant(TYPE_INT, field->null_union_position);
-      if (is_null_ptr == NULL) {
+      if (is_null_ptr == nullptr) {
         is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->boolean_type(),
             "is_null_ptr");
       }
@@ -909,7 +901,7 @@ Status HdfsAvroScanner::CodegenReadRecord(
 
       // Write null field IR
       builder->SetInsertPoint(null_block);
-      if (slot_idx != HdfsScanNode::SKIP_COLUMN) {
+      if (slot_idx != HdfsScanNodeBase::SKIP_COLUMN) {
         slot_desc->CodegenSetNullIndicator(
             codegen, builder, tuple_val, codegen->true_value());
       }
@@ -925,7 +917,7 @@ Status HdfsAvroScanner::CodegenReadRecord(
     Value *ret_val = nullptr;
     if (field->schema->type == AVRO_RECORD) {
       BasicBlock* insert_before_block =
-          (null_block != NULL) ? null_block : end_field_block;
+          (null_block != nullptr) ? null_block : end_field_block;
       RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, node, codegen, builder, fn,
           insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val,
           data_end_val));
@@ -965,9 +957,9 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
       break;
     case AVRO_STRING:
     case AVRO_BYTES:
-      if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
+      if (slot_desc != nullptr && slot_desc->type().type == TYPE_VARCHAR) {
         read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR, false);
-      } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
+      } else if (slot_desc != nullptr && slot_desc->type().type == TYPE_CHAR) {
         read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_CHAR, false);
       } else {
         read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING, false);
@@ -986,7 +978,7 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
   Value* write_slot_val = builder->getFalse();
   Value* slot_type_val = builder->getInt32(0);
   Value* opaque_slot_val = codegen->null_ptr_value();
-  if (slot_desc != NULL) {
+  if (slot_desc != nullptr) {
     // Field corresponds to a materialized column, fill in relevant arguments
     write_slot_val = builder->getTrue();
     if (slot_desc->type().type == TYPE_DECIMAL) {
@@ -995,14 +987,14 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
     } else {
       slot_type_val = builder->getInt32(slot_desc->type().type);
     }
-    Value* slot_val = builder->CreateStructGEP(NULL, tuple_val, slot_desc->llvm_field_idx(),
+    Value* slot_val = builder->CreateStructGEP(nullptr, tuple_val, slot_desc->llvm_field_idx(),
         "slot");
     opaque_slot_val =
         builder->CreateBitCast(slot_val, codegen->ptr_type(), "opaque_slot");
   }
 
   // NOTE: ReadAvroVarchar/Char has different signature than rest of read functions
-  if (slot_desc != NULL &&
+  if (slot_desc != nullptr &&
       (slot_desc->type().type == TYPE_VARCHAR || slot_desc->type().type == TYPE_CHAR)) {
     // Need to pass an extra argument (the length) to the codegen function.
     Value* fixed_len = builder->getInt32(slot_desc->type().len);
@@ -1021,7 +1013,7 @@ Status HdfsAvroScanner::CodegenDecodeAvroData(LlvmCodeGen* codegen,
     Function* materialize_tuple_fn, const vector<ScalarExpr*>& conjuncts,
     Function** decode_avro_data_fn) {
   SCOPED_TIMER(codegen->codegen_timer());
-  DCHECK(materialize_tuple_fn != NULL);
+  DCHECK(materialize_tuple_fn != nullptr);
 
   Function* fn = codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true);
 
@@ -1036,7 +1028,7 @@ Status HdfsAvroScanner::CodegenDecodeAvroData(LlvmCodeGen* codegen,
 
   fn->setName("DecodeAvroData");
   *decode_avro_data_fn = codegen->FinalizeFunction(fn);
-  if (*decode_avro_data_fn == NULL) {
+  if (*decode_avro_data_fn == nullptr) {
     return Status("Failed to finalize decode_avro_data_fn.");
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index b09d87f..211b7c3 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -19,15 +19,15 @@
 #ifndef IMPALA_EXEC_HDFS_AVRO_SCANNER_H
 #define IMPALA_EXEC_HDFS_AVRO_SCANNER_H
 
-/// This scanner reads Avro object container files (i.e., Avro data files)
-/// located in HDFS and writes the content as tuples in the Impala in-memory
-/// representation of data (e.g. tuples, rows, row batches).
-//
+/// This scanner reads Avro object container files (i.e., Avro data files) and writes the
+/// content as tuples in the Impala in-memory representation of data (tuples, rows,
+/// row batches).
+///
 /// The specification for Avro files can be found at
 /// http://avro.apache.org/docs/current/spec.html (the current Avro version is
 /// 1.7.4 as of the time of this writing). At a high level, an Avro data file has
 /// the following structure:
-//
+///
 /// - Avro data file
 ///   - file header
 ///     - file version header
@@ -40,20 +40,20 @@
 ///     - size of objects in block (post-compression)
 ///     - serialized objects
 ///     - sync marker
-//
-//
+///
+///
 /// This implementation reads one data block at a time, using the schema from the file
 /// header to decode the serialized objects. If possible, non-materialized columns are
 /// skipped without being read. If codegen is enabled, we codegen a function based on the
 /// table schema that parses records, materializes them to tuples, and evaluates the
 /// conjuncts.
-//
+///
 /// The Avro C library is used to parse the file's schema and the table's schema, which are
 /// then resolved according to the Avro spec and transformed into our own schema
 /// representation (i.e. a list of SchemaElements). Schema resolution allows users to
 /// evolve the table schema and file schema(s) independently. The spec goes over all the
 /// rules for schema resolution, but in summary:
-//
+///
 /// - Record fields are matched by name (and thus can be reordered; the table schema
 ///   determines the order of the columns)
 /// - Fields in the file schema not present in the table schema are ignored
@@ -61,7 +61,7 @@
 ///   specified
 /// - Types can be "promoted" as follows:
 ///   int -> long -> float -> double
-//
+///
 /// TODO:
 /// - implement SkipComplex()
 /// - codegen a function per unique file schema, rather than just the table schema
@@ -91,25 +91,24 @@ class HdfsAvroScanner : public BaseSequenceScanner {
 
   HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
 
-  virtual Status Open(ScannerContext* context);
+  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
 
   /// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if
-  /// codegen was successful or NULL otherwise.
+  /// codegen was successful or nullptr otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
       const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** decode_avro_data_fn);
+      llvm::Function** decode_avro_data_fn)
+      WARN_UNUSED_RESULT;
 
  protected:
   /// Implementation of BaseSeqeunceScanner super class methods
   virtual FileHeader* AllocateFileHeader();
   /// TODO: check that file schema matches metadata schema
-  virtual Status ReadFileHeader();
-  virtual Status InitNewRange();
-  virtual Status ProcessRange();
+  virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
+  virtual Status InitNewRange() WARN_UNUSED_RESULT;
+  virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
-  virtual THdfsFileFormat::type file_format() const {
-    return THdfsFileFormat::AVRO;
-  }
+  virtual THdfsFileFormat::type file_format() const { return THdfsFileFormat::AVRO; }
 
  private:
   friend class HdfsAvroScannerTest;
@@ -119,9 +118,11 @@ class HdfsAvroScanner : public BaseSequenceScanner {
     ScopedAvroSchemaElement schema;
 
     /// Template tuple for this file containing partition key values and default values.
-    /// NULL if there are no materialized partition keys and no default values are
-    /// necessary (i.e., all materialized fields are present in the file schema).
-    /// template_tuple_ is set to this value.
+    /// Set to nullptr if there are no materialized partition keys and no default values
+    /// are necessary (i.e., all materialized fields are present in the file schema).
+    /// This tuple is created by the scanner processing the initial scan range with
+    /// the header. The ownership of memory is transferred to the scan-node pool,
+    /// such that it remains live when subsequent scanners process data ranges.
     Tuple* template_tuple;
 
     /// True if this file can use the codegen'd version of DecodeAvroData() (i.e. its
@@ -129,7 +130,16 @@ class HdfsAvroScanner : public BaseSequenceScanner {
     bool use_codegend_decode_avro_data;
   };
 
-  AvroFileHeader* avro_header_;
+  AvroFileHeader* avro_header_ = nullptr;
+
+  /// Current data block after decompression with its end and length.
+  uint8_t* data_block_ = nullptr;
+  uint8_t* data_block_end_ = nullptr;
+  int64_t data_block_len_ = 0;
+
+  /// Number of records in the current data block and the current record position.
+  int64_t num_records_in_block_ = 0;
+  int64_t record_pos_ = 0;
 
   /// Metadata keys
   static const std::string AVRO_SCHEMA_KEY;
@@ -143,28 +153,29 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   typedef int (*DecodeAvroDataFn)(HdfsAvroScanner*, int, MemPool*, uint8_t**, uint8_t*,
                                   Tuple*, TupleRow*);
 
-  /// The codegen'd version of DecodeAvroData() if available, NULL otherwise.
-  DecodeAvroDataFn codegend_decode_avro_data_;
+  /// The codegen'd version of DecodeAvroData() if available, nullptr otherwise.
+  DecodeAvroDataFn codegend_decode_avro_data_ = nullptr;
 
   /// Utility function for decoding and parsing file header metadata
-  Status ParseMetadata();
+  Status ParseMetadata() WARN_UNUSED_RESULT;
 
   /// Resolves the table schema (i.e. the reader schema) against the file schema (i.e. the
   /// writer schema), and sets the 'slot_desc' fields of the nodes of the file schema
   /// corresponding to materialized slots. Calls WriteDefaultValue() as
   /// appropriate. Returns a non-OK status if the schemas could not be resolved.
   Status ResolveSchemas(const AvroSchemaElement& table_root,
-                        AvroSchemaElement* file_root);
+      AvroSchemaElement* file_root) WARN_UNUSED_RESULT;
 
   // Returns Status::OK iff table_schema (the reader schema) can be resolved against
   // file_schema (the writer schema). field_name is used for error messages.
   Status VerifyTypesMatch(const AvroSchemaElement& table_schema,
-      const AvroSchemaElement& file_schema, const string& field_name);
+      const AvroSchemaElement& file_schema, const string& field_name) WARN_UNUSED_RESULT;
 
   /// Returns Status::OK iff a value with the given schema can be used to populate
   /// 'slot_desc', as if 'schema' were the writer schema and 'slot_desc' the reader
   /// schema. 'schema' can be either a avro_schema_t or avro_datum_t.
-  Status VerifyTypesMatch(SlotDescriptor* slot_desc, avro_obj_t* schema);
+  Status VerifyTypesMatch(SlotDescriptor* slot_desc, avro_obj_t* schema)
+      WARN_UNUSED_RESULT;
 
   /// Return true if reader_type can be used to read writer_type according to the Avro
   /// type promotion rules. Note that this does not handle nullability or TYPE_NULL.
@@ -175,7 +186,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// and default_value's types are incompatible or unsupported. field_name is used for
   /// error messages.
   Status WriteDefaultValue(SlotDescriptor* slot_desc, avro_datum_t default_value,
-      const char* field_name);
+      const char* field_name) WARN_UNUSED_RESULT;
 
   /// Decodes records and copies the data into tuples.
   /// Returns the number of tuples to be committed.
@@ -200,14 +211,15 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   static Status CodegenDecodeAvroData(LlvmCodeGen* codegen,
       llvm::Function* materialize_tuple_fn,
       const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** decode_avro_data_fn);
+      llvm::Function** decode_avro_data_fn)
+      WARN_UNUSED_RESULT;
 
   /// Codegens a version of MaterializeTuple() that reads records based on the table
   /// schema. Stores the resulting function in 'materialize_tuple_fn' if codegen was
   /// successful or returns an error.
   /// TODO: Codegen a function for each unique file schema.
   static Status CodegenMaterializeTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen,
-      llvm::Function** materialize_tuple_fn);
+      llvm::Function** materialize_tuple_fn) WARN_UNUSED_RESULT;
 
   /// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro
   /// record.
@@ -228,13 +240,14 @@ class HdfsAvroScanner : public BaseSequenceScanner {
       LlvmCodeGen* codegen, void* builder, llvm::Function* fn,
       llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, llvm::Value* this_val,
       llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val,
-      llvm::Value* data_end_val);
+      llvm::Value* data_end_val) WARN_UNUSED_RESULT;
 
   /// Creates the IR for reading an Avro scalar at builder's current insert point.
   static Status CodegenReadScalar(const AvroSchemaElement& element,
       SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
       llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
-      llvm::Value* data_val, llvm::Value* data_end_val, llvm::Value** ret_val);
+      llvm::Value* data_val, llvm::Value* data_end_val, llvm::Value** ret_val)
+      WARN_UNUSED_RESULT;
 
   /// The following are cross-compiled functions for parsing a serialized Avro primitive
   /// type and writing it to a slot. They can also be used for skipping a field without

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 8522767..2689045 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -18,6 +18,7 @@
 #include "exec/hdfs-parquet-scanner.h"
 
 #include <limits> // for std::numeric_limits
+#include <memory>
 #include <queue>
 
 #include <gflags/gflags.h>
@@ -48,6 +49,7 @@
 #include "common/names.h"
 
 using llvm::Function;
+using std::move;
 using namespace impala;
 using namespace llvm;
 
@@ -168,6 +170,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
     process_footer_timer_stats_(NULL),
     num_cols_counter_(NULL),
+    num_stats_filtered_row_groups_counter_(NULL),
     num_row_groups_counter_(NULL),
     num_scanners_with_no_reads_counter_(NULL),
     num_dict_filtered_row_groups_counter_(NULL),
@@ -272,10 +275,11 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
     FlushRowGroupResources(row_batch);
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
     if (scan_node_->HasRowBatchQueue()) {
-      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+          unique_ptr<RowBatch>(row_batch));
     }
   } else {
-    if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll();
+    template_tuple_pool_->FreeAll();
     dictionary_pool_.get()->FreeAll();
     context_->ReleaseCompletedResources(nullptr, true);
     for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
@@ -327,7 +331,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
         local.considered, local.rejected);
   }
 
-  HdfsScanner::Close(row_batch);
+  CloseInternal();
 }
 
 // Get the start of the column.
@@ -404,19 +408,19 @@ Status HdfsParquetScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
   HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
   do {
-    batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
-        scan_node_->mem_tracker());
-    RETURN_IF_ERROR(GetNextInternal(batch_));
-    scan_node->AddMaterializedRowBatch(batch_);
+    unique_ptr<RowBatch> batch = unique_ptr<RowBatch>(
+        new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+        scan_node_->mem_tracker()));
+    Status status = GetNextInternal(batch.get());
+    // Always add batch to the queue because it may contain data referenced by previously
+    // appended batches.
+    scan_node->AddMaterializedRowBatch(move(batch));
+    RETURN_IF_ERROR(status);
     ++row_batches_produced_;
     if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
       CheckFiltersEffectiveness();
     }
   } while (!eos_ && !scan_node_->ReachedLimit());
-
-  // Transfer the remaining resources to this new batch in Close().
-  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
-      scan_node_->mem_tracker());
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index ad86dc8..b31d321 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -328,17 +328,19 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Issue just the footer range for each file.  We'll then parse the footer and pick
   /// out the columns we want.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
-                                   const std::vector<HdfsFileDesc*>& files);
+                                   const std::vector<HdfsFileDesc*>& files)
+                                   WARN_UNUSED_RESULT;
 
-  virtual Status Open(ScannerContext* context);
-  virtual Status ProcessSplit();
+  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
+  virtual Status ProcessSplit() WARN_UNUSED_RESULT;
   virtual void Close(RowBatch* row_batch);
 
   /// Codegen ProcessScratchBatch(). Stores the resulting function in
   /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
       const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** process_scratch_batch_fn);
+      llvm::Function** process_scratch_batch_fn)
+      WARN_UNUSED_RESULT;
 
   /// The repetition level is set to this value to indicate the end of a row group.
   static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
@@ -478,14 +480,14 @@ class HdfsParquetScanner : public HdfsScanner {
 
   const char* filename() const { return metadata_range_->file(); }
 
-  virtual Status GetNextInternal(RowBatch* row_batch);
+  virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   /// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics
   /// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to
   /// compute the statistics. Sets 'skip_row_group' to true if the row group can be
   /// skipped, 'false' otherwise.
   Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
-      const parquet::RowGroup& row_group, bool* skip_row_group);
+      const parquet::RowGroup& row_group, bool* skip_row_group) WARN_UNUSED_RESULT;
 
   /// Check runtime filters' effectiveness every BATCHES_PER_FILTER_SELECTIVITY_CHECK
   /// row batches. Will update 'filter_stats_'.
@@ -496,7 +498,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// state. Only returns a non-OK status if a non-recoverable error is encountered
   /// (or abort_on_error is true). If OK is returned, 'parse_status_' is guaranteed
   /// to be OK as well.
-  Status NextRowGroup();
+  Status NextRowGroup() WARN_UNUSED_RESULT;
 
   /// Reads data using 'column_readers' to materialize top-level tuples into 'row_batch'.
   /// Returns a non-OK status if a non-recoverable error was encountered and execution
@@ -504,13 +506,13 @@ class HdfsParquetScanner : public HdfsScanner {
   /// May set *skip_row_group to indicate that the current row group should be skipped,
   /// e.g., due to a parse error, but execution should continue.
   Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
-      RowBatch* row_batch, bool* skip_row_group);
+      RowBatch* row_batch, bool* skip_row_group) WARN_UNUSED_RESULT;
 
   /// Commit num_rows to the given row batch.
   /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits.
   /// Scanner can call this with 0 rows to flush any pending resources (attached pools
   /// and io buffers) to minimize memory consumption.
-  Status CommitRows(RowBatch* dst_batch, int num_rows);
+  Status CommitRows(RowBatch* dst_batch, int num_rows) WARN_UNUSED_RESULT;
 
   /// Evaluates runtime filters and conjuncts (if any) against the tuples in
   /// 'scratch_batch_', and adds the surviving tuples to the given batch.
@@ -539,7 +541,8 @@ class HdfsParquetScanner : public HdfsScanner {
   /// 'filter_ctxs'. Return error status on failure. The generated function is returned
   /// via 'fn'.
   static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
-      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
+      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn)
+      WARN_UNUSED_RESULT;
 
   /// Reads data using 'column_readers' to materialize the tuples of a CollectionValue
   /// allocated from 'coll_value_builder'.
@@ -572,7 +575,7 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Process the file footer and parse file_metadata_.  This should be called with the
   /// last FOOTER_SIZE bytes in context_.
-  Status ProcessFooter();
+  Status ProcessFooter() WARN_UNUSED_RESULT;
 
   /// Populates 'column_readers' for the slots in 'tuple_desc', including creating child
   /// readers for any collections. Schema resolution is handled in this function as
@@ -580,7 +583,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// fields missing in the file.
   Status CreateColumnReaders(const TupleDescriptor& tuple_desc,
       const ParquetSchemaResolver& schema_resolver,
-      std::vector<ParquetColumnReader*>* column_readers);
+      std::vector<ParquetColumnReader*>* column_readers) WARN_UNUSED_RESULT;
 
   /// Returns the total number of scalar column readers in 'column_readers', including
   /// the children of collection readers.
@@ -597,25 +600,28 @@ class HdfsParquetScanner : public HdfsScanner {
   /// still need to iterate over every item in the collection to count them.
   Status CreateCountingReader(const SchemaPath& parent_path,
       const ParquetSchemaResolver& schema_resolver,
-      ParquetColumnReader** reader);
+      ParquetColumnReader** reader)
+      WARN_UNUSED_RESULT;
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  This
   /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
   /// should be the readers used to materialize a single tuple (i.e., column_readers_ or
   /// the children of a collection node).
   Status InitColumns(
-      int row_group_idx, const std::vector<ParquetColumnReader*>& column_readers);
+      int row_group_idx, const std::vector<ParquetColumnReader*>& column_readers)
+      WARN_UNUSED_RESULT;
 
   /// Initialize dictionaries for all column readers
-  Status InitDictionaries(const std::vector<ParquetColumnReader*>& column_readers);
+  Status InitDictionaries(const std::vector<ParquetColumnReader*>& column_readers)
+      WARN_UNUSED_RESULT;
 
   /// Performs some validation once we've reached the end of a row group to help detect
   /// bugs or bad input files.
   Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& column_readers,
-      int row_group_idx, int64_t rows_read);
+      int row_group_idx, int64_t rows_read) WARN_UNUSED_RESULT;
 
   /// Part of the HdfsScanner interface, not used in Parquet.
-  Status InitNewRange() { return Status::OK(); }
+  Status InitNewRange() WARN_UNUSED_RESULT { return Status::OK(); }
 
   /// Transfers the remaining resources backing tuples such as IO buffers and memory
   /// from mem pools to the given row batch. Closes all column readers.
@@ -627,7 +633,7 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Divides the column readers into dict_filterable_readers_ and
   /// non_dict_filterable_readers_. Allocates memory for dict_filter_tuple_backing_.
-  Status InitDictFilterStructures();
+  Status InitDictFilterStructures() WARN_UNUSED_RESULT;
 
   /// Returns true if all of the data pages in the column chunk are dictionary encoded
   bool IsDictionaryEncoded(const parquet::ColumnMetaData& col_metadata);
@@ -636,7 +642,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// to the dictionary values. Specifically, if any dictionary-encoded column has
   /// no values that pass the relevant conjuncts, then the row group can be skipped.
   Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
-      bool* skip_row_group);
+      bool* skip_row_group) WARN_UNUSED_RESULT;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index 411270a..6851bd6 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -68,7 +68,7 @@ Status HdfsRCFileScanner::Open(ScannerContext* context) {
 }
 
 Status HdfsRCFileScanner::InitNewRange() {
-  DCHECK(header_ != NULL);
+  DCHECK(header_ != nullptr);
 
   only_parsing_header_ = false;
   row_group_buffer_size_ = 0;
@@ -82,7 +82,7 @@ Status HdfsRCFileScanner::InitNewRange() {
   stream_->set_contains_tuple_data(false);
 
   if (header_->is_compressed) {
-    RETURN_IF_ERROR(Codec::CreateDecompressor(NULL,
+    RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr,
         reuse_row_group_buffer_, header_->codec, &decompressor_));
   }
 
@@ -95,7 +95,7 @@ Status HdfsRCFileScanner::InitNewRange() {
     if (i < num_table_cols) {
       int col_idx = i + scan_node_->num_partition_keys();
       columns_[i].materialize_column = scan_node_->GetMaterializedSlotIdx(
-          vector<int>(1, col_idx)) != HdfsScanNode::SKIP_COLUMN;
+          vector<int>(1, col_idx)) != HdfsScanNodeBase::SKIP_COLUMN;
     } else {
       // Treat columns not found in table metadata as extra unmaterialized columns
       columns_[i].materialize_column = false;
@@ -107,10 +107,10 @@ Status HdfsRCFileScanner::InitNewRange() {
 }
 
 Status HdfsRCFileScanner::ReadFileHeader() {
-  uint8_t* header;
-
   RcFileHeader* rc_header = reinterpret_cast<RcFileHeader*>(header_);
+
   // Validate file version
+  uint8_t* header;
   RETURN_IF_FALSE(stream_->ReadBytes(
       sizeof(RCFILE_VERSION_HEADER), &header, &parse_status_));
   if (!memcmp(header, HdfsSequenceScanner::SEQFILE_VERSION_HEADER,
@@ -230,7 +230,7 @@ BaseSequenceScanner::FileHeader* HdfsRCFileScanner::AllocateFileHeader() {
   return new RcFileHeader;
 }
 
-Status HdfsRCFileScanner::ResetRowGroup() {
+Status HdfsRCFileScanner::StartRowGroup() {
   num_rows_ = 0;
   row_pos_ = 0;
   key_length_ = 0;
@@ -246,17 +246,6 @@ Status HdfsRCFileScanner::ResetRowGroup() {
     columns_[i].current_field_len_rep = 0;
   }
 
-  // We are done with this row group, pass along external buffers if necessary.
-  if (!reuse_row_group_buffer_) {
-    RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
-    row_group_buffer_size_ = 0;
-  }
-  return Status::OK();
-}
-
-Status HdfsRCFileScanner::ReadRowGroup() {
-  RETURN_IF_ERROR(ResetRowGroup());
-
   while (num_rows_ == 0) {
     RETURN_IF_ERROR(ReadRowGroupHeader());
     RETURN_IF_ERROR(ReadKeyBuffers());
@@ -267,7 +256,7 @@ Status HdfsRCFileScanner::ReadRowGroup() {
       // The row group length depends on the user data and can be very big. This
       // can cause us to go way over the mem limit so use TryAllocate instead.
       row_group_buffer_ = data_buffer_pool_->TryAllocate(row_group_length_);
-      if (UNLIKELY(row_group_buffer_ == NULL)) {
+      if (UNLIKELY(row_group_buffer_ == nullptr)) {
         string details("RC file scanner failed to allocate row group buffer.");
         return scan_node_->mem_tracker()->MemLimitExceeded(state_, details,
             row_group_length_);
@@ -452,100 +441,104 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
   return Status::OK();
 }
 
-Status HdfsRCFileScanner::ProcessRange() {
-  RETURN_IF_ERROR(ResetRowGroup());
-
+Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
   // HdfsRCFileScanner effectively does buffered IO, in that it reads all the
   // materialized columns into a row group buffer.
   // It will then materialize tuples from the row group buffer.  When the row
   // group is complete, it will move onto the next row group.
-  while (!finished()) {
-    DCHECK_EQ(num_rows_, row_pos_);
-    // Finished materializing this row group, read the next one.
-    RETURN_IF_ERROR(ReadRowGroup());
-    if (num_rows_ == 0) break;
+  if (row_pos_ == num_rows_) {
+    // Finished materializing the current row group, read the next one.
+    RETURN_IF_ERROR(StartRowGroup());
+    if (num_rows_ == 0) {
+      eos_ = true;
+      return Status::OK();
+    }
+  }
+
+  while (row_pos_ != num_rows_) {
+    SCOPED_TIMER(scan_node_->materialize_tuple_timer());
+
+    // Materialize rows from this row group in row batch sizes
+    Tuple* tuple = tuple_;
+    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+    int max_tuples = row_batch->capacity() - row_batch->num_rows();
+    max_tuples = min(max_tuples, num_rows_ - row_pos_);
+
+    const vector<SlotDescriptor*>& materialized_slots =
+        scan_node_->materialized_slots();
+    if (materialized_slots.empty()) {
+      // If there are no materialized slots (e.g. count(*) or just partition cols)
+      // we can shortcircuit the parse loop
+      row_pos_ += max_tuples;
+      int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+      COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
+      RETURN_IF_ERROR(CommitRows(num_to_commit, row_batch));
+      if (row_batch->AtCapacity()) break;
+      continue;
+    }
 
-    while (num_rows_ != row_pos_) {
-      SCOPED_TIMER(scan_node_->materialize_tuple_timer());
+    int num_to_commit = 0;
+    for (int i = 0; i < max_tuples; ++i) {
+      RETURN_IF_ERROR(NextRow());
+      InitTuple(template_tuple_, tuple);
 
-      // Indicates whether the current row has errors.
       bool error_in_row = false;
-      const vector<SlotDescriptor*>& materialized_slots =
-          scan_node_->materialized_slots();
-      vector<SlotDescriptor*>::const_iterator it;
-
-      // Materialize rows from this row group in row batch sizes
-      MemPool* pool;
-      Tuple* tuple;
-      TupleRow* current_row;
-      int max_tuples = GetMemory(&pool, &tuple, &current_row);
-      max_tuples = min(max_tuples, num_rows_ - row_pos_);
-
-      if (materialized_slots.empty()) {
-        // If there are no materialized slots (e.g. count(*) or just partition cols)
-        // we can shortcircuit the parse loop
-        row_pos_ += max_tuples;
-        int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
-        COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
-        RETURN_IF_ERROR(CommitRows(num_to_commit));
-        continue;
-      }
+      for (const SlotDescriptor* slot_desc: materialized_slots) {
+        int file_column_idx = slot_desc->col_pos() - scan_node_->num_partition_keys();
 
-      int num_to_commit = 0;
-      for (int i = 0; i < max_tuples; ++i) {
-        RETURN_IF_ERROR(NextRow());
-
-        // Initialize tuple from the partition key template tuple before writing the
-        // slots
-        InitTuple(template_tuple_, tuple);
-
-        for (it = materialized_slots.begin(); it != materialized_slots.end(); ++it) {
-          const SlotDescriptor* slot_desc = *it;
-          int file_column_idx = slot_desc->col_pos() - scan_node_->num_partition_keys();
-
-          // Set columns missing in this file to NULL
-          if (file_column_idx >= columns_.size()) {
-            tuple->SetNull(slot_desc->null_indicator_offset());
-            continue;
-          }
-
-          ColumnInfo& column = columns_[file_column_idx];
-          DCHECK(column.materialize_column);
-
-          const char* col_start = reinterpret_cast<const char*>(
-              row_group_buffer_ + column.start_offset + column.buffer_pos);
-          int field_len = column.current_field_len;
-          DCHECK_LE(col_start + field_len,
-              reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_));
-
-          if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
-              false, false, pool)) {
-            ReportColumnParseError(slot_desc, col_start, field_len);
-            error_in_row = true;
-          }
+        // Set columns missing in this file to NULL
+        if (file_column_idx >= columns_.size()) {
+          tuple->SetNull(slot_desc->null_indicator_offset());
+          continue;
         }
 
-        if (error_in_row) {
-          error_in_row = false;
-          ErrorMsg msg(TErrorCode::GENERAL, Substitute("file: $0", stream_->filename()));
-          RETURN_IF_ERROR(state_->LogOrReturnError(msg));
-        }
+        const ColumnInfo& column = columns_[file_column_idx];
+        DCHECK(column.materialize_column);
 
-        current_row->SetTuple(scan_node_->tuple_idx(), tuple);
-        // Evaluate the conjuncts and add the row to the batch
-        if (EvalConjuncts(current_row)) {
-          ++num_to_commit;
-          current_row = next_row(current_row);
-          tuple = next_tuple(tuple_byte_size_, tuple);
+        const char* col_start = reinterpret_cast<const char*>(
+            row_group_buffer_ + column.start_offset + column.buffer_pos);
+        const int field_len = column.current_field_len;
+        DCHECK_LE(col_start + field_len,
+            reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_));
+
+        if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
+            false, false, row_batch->tuple_data_pool())) {
+          ReportColumnParseError(slot_desc, col_start, field_len);
+          error_in_row = true;
         }
       }
-      COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
-      RETURN_IF_ERROR(CommitRows(num_to_commit));
-      if (scan_node_->ReachedLimit()) return Status::OK();
+
+      if (error_in_row) {
+        error_in_row = false;
+        ErrorMsg msg(TErrorCode::GENERAL, Substitute("file: $0", stream_->filename()));
+        RETURN_IF_ERROR(state_->LogOrReturnError(msg));
+      }
+
+      current_row->SetTuple(scan_node_->tuple_idx(), tuple);
+      // Evaluate the conjuncts and add the row to the batch
+      if (EvalConjuncts(current_row)) {
+        ++num_to_commit;
+        current_row = next_row(current_row);
+        tuple = next_tuple(tuple_byte_size_, tuple);
+      }
+    }
+    COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
+    RETURN_IF_ERROR(CommitRows(num_to_commit, row_batch));
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+  }
+
+  if (row_pos_ == num_rows_) {
+    // We are done with this row group, pass along external buffers if necessary.
+    if (!reuse_row_group_buffer_) {
+      row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
+      row_group_buffer_size_ = 0;
     }
 
     // RCFiles don't end with syncs
-    if (stream_->eof()) return Status::OK();
+    if (stream_->eof()) {
+      eos_ = true;
+      return Status::OK();
+    }
 
     // Check for sync by looking for the marker that precedes syncs.
     int marker;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-rcfile-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h
index 0330979..835d2a2 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -226,7 +226,7 @@
 namespace impala {
 
 struct HdfsFileDesc;
-class HdfsScanNode;
+class HdfsScanNodeBase;
 class TupleDescriptor;
 class Tuple;
 
@@ -236,7 +236,7 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   HdfsRCFileScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
   virtual ~HdfsRCFileScanner();
 
-  virtual Status Open(ScannerContext* context);
+  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
 
   void DebugString(int indentation_level, std::stringstream* out) const;
 
@@ -259,30 +259,28 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
 
   /// Implementation of superclass functions.
   virtual FileHeader* AllocateFileHeader();
-  virtual Status ReadFileHeader();
-  virtual Status InitNewRange();
-  virtual Status ProcessRange();
+  virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
+  virtual Status InitNewRange() WARN_UNUSED_RESULT;
+  virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
-  virtual THdfsFileFormat::type file_format() const {
-    return THdfsFileFormat::RC_FILE;
-  }
+  virtual THdfsFileFormat::type file_format() const { return THdfsFileFormat::RC_FILE; }
 
   /// Reads the RCFile Header Metadata section in the current file to determine the number
   /// of columns.  Other pieces of the metadata are ignored.
-  Status ReadNumColumnsMetadata();
+  Status ReadNumColumnsMetadata() WARN_UNUSED_RESULT;
 
   /// Reads the rowgroup header starting after the sync.
   /// Sets:
   ///   key_length_
   ///   compressed_key_length_
   ///   num_rows_
-  Status ReadRowGroupHeader();
+  Status ReadRowGroupHeader() WARN_UNUSED_RESULT;
 
   /// Read the rowgroup key buffers, decompress if necessary.
   /// The "keys" are really the lengths for the column values.  They
   /// are read here and then used to decode the values in the column buffer.
   /// Calls GetCurrentKeyBuffer for each column to process the key data.
-  Status ReadKeyBuffers();
+  Status ReadKeyBuffers() WARN_UNUSED_RESULT;
 
   /// Process the current key buffer.
   /// Inputs:
@@ -301,7 +299,7 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   /// Read the rowgroup column buffers
   /// Sets:
   ///   column_buffer_: Fills the buffer with either file data or decompressed data.
-  Status ReadColumnBuffers();
+  Status ReadColumnBuffers() WARN_UNUSED_RESULT;
 
   /// Look at the next field in the specified column buffer
   /// Input:
@@ -311,22 +309,19 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   ///   key_buf_pos_[col_idx]
   ///   cur_field_length_rep_[col_idx]
   ///   cur_field_length_[col_idx]
-  Status NextField(int col_idx);
+  Status NextField(int col_idx) WARN_UNUSED_RESULT;
 
   /// Read a row group (except for the sync marker and sync) into buffers.
   /// Calls:
-  ///   ReadRowGroupHeader
-  ///   ReadKeyBuffers
-  ///   ReadColumnBuffers
-  Status ReadRowGroup();
-
-  /// Reset state for a new row group. Can fail if allocating the next row batch fails.
-  Status ResetRowGroup();
+  ///   ReadRowGroupHeader()
+  ///   ReadKeyBuffers()
+  ///   ReadColumnBuffers()
+  Status StartRowGroup() WARN_UNUSED_RESULT;
 
   /// Move to next row. Calls NextField on each column that we are reading.
   /// Modifies:
   ///   row_pos_
-  Status NextRow();
+  Status NextRow() WARN_UNUSED_RESULT;
 
   enum Version {
     SEQ6,     // Version for sequence file and pre hive-0.9 rc files
@@ -382,36 +377,36 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   std::vector<uint8_t> key_buffer_;
 
   /// number of rows in this rowgroup object
-  int num_rows_;
+  int num_rows_ = 0;
 
   /// Current row position in this rowgroup.
   /// This value is incremented each time NextRow() is called.
-  int row_pos_;
+  int row_pos_ = 0;
 
   /// Size of the row group's key buffers.
   /// Read from the row group header.
-  int key_length_;
+  int key_length_ = -1;
 
   /// Compressed size of the row group's key buffers.
   /// Read from the row group header.
-  int compressed_key_length_;
+  int compressed_key_length_ = -1;
 
   /// If true, the row_group_buffer_ can be reused across row groups, otherwise,
   /// it (more specifically the data_buffer_pool_ that allocated the row_group_buffer_)
   /// must be attached to the row batch.
-  bool reuse_row_group_buffer_;
+  bool reuse_row_group_buffer_ = false;
 
   /// Buffer containing the entire row group.  We allocate a buffer for the entire
   /// row group, skipping non-materialized columns.
-  uint8_t* row_group_buffer_;
+  uint8_t* row_group_buffer_ = nullptr;
 
   /// Sum of the bytes lengths of the materialized columns in the current row group.  This
   /// is the number of valid bytes in row_group_buffer_.
-  int row_group_length_;
+  int64_t row_group_length_ = 0;
 
   /// This is the allocated size of 'row_group_buffer_'.  'row_group_buffer_' is reused
   /// across row groups and will grow as necessary.
-  int row_group_buffer_size_;
+  int64_t row_group_buffer_size_ = 0;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 7e55418..469c00e 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -687,8 +687,7 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
   if (status.ok()) {
     status = scanner->get()->Open(context);
     if (!status.ok()) {
-      RowBatch* batch = (HasRowBatchQueue()) ? scanner->get()->batch() : NULL;
-      scanner->get()->Close(batch);
+      scanner->get()->Close(nullptr);
       scanner->reset();
     }
   } else {
@@ -809,6 +808,10 @@ void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const
   }
 }
 
+void HdfsScanNodeBase::TransferToScanNodePool(MemPool* pool) {
+  scan_node_pool_->AcquireData(pool, false);
+}
+
 void HdfsScanNodeBase::UpdateHdfsSplitStats(
     const vector<TScanRangeParams>& scan_range_params_list,
     PerVolumeStats* per_volume_stats) {