You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/07/03 00:29:12 UTC

[2/6] incubator-impala git commit: IMPALA-3905: HdfsScanner::GetNext() for Avro, RC, and Seq scans.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 7af5d5d..11e5718 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -231,15 +231,15 @@ class HdfsScanNodeBase : public ScanNode {
   Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals,
       MemPool* pool, RuntimeState* state) const;
 
-  /// Returns the file desc for 'filename'.  Returns NULL if filename is invalid.
+  /// Returns the file desc for 'filename'.  Returns nullptr if filename is invalid.
   HdfsFileDesc* GetFileDesc(const std::string& filename);
 
   /// Sets the scanner specific metadata for 'filename'. Scanners can use this to store
   /// file header information. Thread safe.
   void SetFileMetadata(const std::string& filename, void* metadata);
 
-  /// Returns the scanner specific metadata for 'filename'. Returns NULL if there is no
-  /// metadata. Thread safe.
+  /// Returns the scanner specific metadata for 'filename'. Returns nullptr if there is
+  /// no metadata. Thread safe.
   void* GetFileMetadata(const std::string& filename);
 
   /// Called by scanners when a range is complete. Used to record progress.
@@ -267,6 +267,9 @@ class HdfsScanNodeBase : public ScanNode {
     return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
   }
 
+  /// Transfers all memory from 'pool' to 'scan_node_pool_'.
+  virtual void TransferToScanNodePool(MemPool* pool);
+
   /// map from volume id to <number of split, per volume split lengths>
   /// TODO: move this into some global .h, no need to include this file just for this
   /// typedef
@@ -329,7 +332,7 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
   /// the partition columns for that partition materialized. Used to filter files and scan
-  /// ranges on partition-column filters. Populated in Prepare().
+  /// ranges on partition-column filters. Populated in Open().
   boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
 
   /// Descriptor for the hdfs table, including partition and format metadata.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 7829d47..4ce12fe 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -40,9 +40,10 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
   HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
   ~HdfsScanNodeMt();
 
-  virtual Status Prepare(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
+      WARN_UNUSED_RESULT;
   virtual void Close(RuntimeState* state);
 
   virtual bool HasRowBatchQueue() const { return false; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 4cbf503..1ab87ac 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -17,6 +17,7 @@
 
 #include "exec/hdfs-scan-node.h"
 
+#include <memory>
 #include <sstream>
 
 #include "common/logging.h"
@@ -111,10 +112,10 @@ Status HdfsScanNode::GetNextInternal(
     return Status::OK();
   }
   *eos = false;
-  RowBatch* materialized_batch = materialized_row_batches_->GetBatch();
+  unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
   if (materialized_batch != NULL) {
     num_owned_io_buffers_.Add(-materialized_batch->num_io_buffers());
-    row_batch->AcquireState(materialized_batch);
+    row_batch->AcquireState(materialized_batch.get());
     // Update the number of materialized rows now instead of when they are materialized.
     // This means that scanners might process and queue up more rows than are necessary
     // for the limit case but we want to avoid the synchronized writes to
@@ -132,7 +133,7 @@ Status HdfsScanNode::GetNextInternal(
       SetDone();
     }
     DCHECK_EQ(materialized_batch->num_io_buffers(), 0);
-    delete materialized_batch;
+    materialized_batch.reset();
     return Status::OK();
   }
   // The RowBatchQueue was shutdown either because all scan ranges are complete or a
@@ -248,12 +249,12 @@ void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
 
 void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
   unique_lock<mutex> l(lock_);
-  scan_node_pool_->AcquireData(pool, false);
+  HdfsScanNodeBase::TransferToScanNodePool(pool);
 }
 
-void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) {
-  InitNullCollectionValues(row_batch);
-  materialized_row_batches_->AddBatch(row_batch);
+void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
+  InitNullCollectionValues(row_batch.get());
+  materialized_row_batches_->AddBatch(move(row_batch));
 }
 
 Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
@@ -541,9 +542,8 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     VLOG_QUERY << ss.str();
   }
 
-  // Transfer the remaining resources to the final row batch (if any) and add it to
-  // the row batch queue.
-  scanner->Close(scanner->batch());
+  // Transfer remaining resources to a final batch and add it to the row batch queue.
+  scanner->Close();
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 41647da..b056e2e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -20,6 +20,7 @@
 #define IMPALA_EXEC_HDFS_SCAN_NODE_H_
 
 #include <map>
+#include <memory>
 #include <stdint.h>
 #include <vector>
 
@@ -66,10 +67,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
   HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
   ~HdfsScanNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
-  virtual Status Prepare(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
+      WARN_UNUSED_RESULT;
   virtual void Close(RuntimeState* state);
 
   virtual bool HasRowBatchQueue() const { return true; }
@@ -78,12 +80,12 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
   /// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
   virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
-      int num_files_queued);
+      int num_files_queued) WARN_UNUSED_RESULT;
 
   /// Adds a materialized row batch for the scan node.  This is called from scanner
   /// threads.
   /// This function will block if materialized_row_batches_ is full.
-  void AddMaterializedRowBatch(RowBatch* row_batch);
+  void AddMaterializedRowBatch(std::unique_ptr<RowBatch> row_batch);
 
   /// Called by scanners when a range is complete. Used to record progress and set done_.
   /// This *must* only be called after a scanner has completely finished its
@@ -93,8 +95,8 @@ class HdfsScanNode : public HdfsScanNodeBase {
   virtual void RangeComplete(const THdfsFileFormat::type& file_type,
       const std::vector<THdfsCompression::type>& compression_type);
 
-  /// Acquires all allocations from pool into scan_node_pool_. Thread-safe.
-  void TransferToScanNodePool(MemPool* pool);
+  /// Transfers all memory from 'pool' to 'scan_node_pool_'.
+  virtual void TransferToScanNodePool(MemPool* pool);
 
  private:
   /// Released when initial ranges are issued in the first call to GetNext().
@@ -164,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
   /// in this split.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
-      DiskIoMgr::ScanRange* scan_range);
+      DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT;
 
   /// Returns true if there is enough memory (against the mem tracker limits) to
   /// have a scanner thread.
@@ -175,7 +177,8 @@ class HdfsScanNode : public HdfsScanNodeBase {
   bool EnoughMemoryForScannerThread(bool new_thread);
 
   /// Checks for eos conditions and returns batches from materialized_row_batches_.
-  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)
+      WARN_UNUSED_RESULT;
 
   /// sets done_ to true and triggers threads to cleanup. Cannot be called with
   /// any locks taken. Calling it repeatedly ignores subsequent calls.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 991c40f..0670f81 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -60,44 +60,16 @@ const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
 HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
     : scan_node_(scan_node),
       state_(state),
-      context_(NULL),
-      stream_(NULL),
-      eos_(false),
-      is_closed_(false),
       expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
-      conjunct_evals_(NULL),
       template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
-      template_tuple_(NULL),
       tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
-      tuple_(NULL),
-      batch_(NULL),
-      tuple_mem_(NULL),
-      parse_status_(Status::OK()),
-      decompression_type_(THdfsCompression::NONE),
-      data_buffer_pool_(new MemPool(scan_node->mem_tracker())),
-      decompress_timer_(NULL),
-      write_tuples_fn_(NULL) {
+      data_buffer_pool_(new MemPool(scan_node->mem_tracker())) {
 }
 
 HdfsScanner::HdfsScanner()
-    : scan_node_(NULL),
-      state_(NULL),
-      context_(NULL),
-      stream_(NULL),
-      eos_(false),
-      is_closed_(false),
-      conjunct_evals_(NULL),
-      template_tuple_pool_(NULL),
-      template_tuple_(NULL),
-      tuple_byte_size_(-1),
-      tuple_(NULL),
-      batch_(NULL),
-      tuple_mem_(NULL),
-      parse_status_(Status::OK()),
-      decompression_type_(THdfsCompression::NONE),
-      data_buffer_pool_(NULL),
-      decompress_timer_(NULL),
-      write_tuples_fn_(NULL) {
+    : scan_node_(nullptr),
+      state_(nullptr),
+      tuple_byte_size_(0) {
   DCHECK(TestInfo::is_test());
 }
 
@@ -142,9 +114,31 @@ Status HdfsScanner::Open(ScannerContext* context) {
   return Status::OK();
 }
 
-void HdfsScanner::Close(RowBatch* row_batch) {
+Status HdfsScanner::ProcessSplit() {
+  DCHECK(scan_node_->HasRowBatchQueue());
+  HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
+  do {
+    unique_ptr<RowBatch> batch = std::make_unique<RowBatch>(scan_node_->row_desc(),
+        state_->batch_size(), scan_node_->mem_tracker());
+    Status status = GetNextInternal(batch.get());
+    // Always add batch to the queue because it may contain data referenced by previously
+    // appended batches.
+    scan_node->AddMaterializedRowBatch(move(batch));
+    RETURN_IF_ERROR(status);
+  } while (!eos_ && !scan_node_->ReachedLimit());
+  return Status::OK();
+}
+
+void HdfsScanner::Close() {
+  DCHECK(scan_node_->HasRowBatchQueue());
+  RowBatch* final_batch = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+      scan_node_->mem_tracker());
+  Close(final_batch);
+}
+
+void HdfsScanner::CloseInternal() {
   DCHECK(!is_closed_);
-  if (decompressor_.get() != NULL) {
+  if (decompressor_.get() != nullptr) {
     decompressor_->Close();
     decompressor_.reset();
   }
@@ -153,7 +147,7 @@ void HdfsScanner::Close(RowBatch* row_batch) {
   }
   expr_mem_pool_->FreeAll();
   obj_pool_.Clear();
-  stream_ = NULL;
+  stream_ = nullptr;
   context_->ClearStreams();
   is_closed_ = true;
 }
@@ -179,26 +173,6 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
   return Status::OK();
 }
 
-Status HdfsScanner::StartNewRowBatch() {
-  DCHECK(scan_node_->HasRowBatchQueue());
-  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
-      scan_node_->mem_tracker());
-  int64_t tuple_buffer_size;
-  RETURN_IF_ERROR(
-      batch_->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
-  return Status::OK();
-}
-
-int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) {
-  DCHECK(scan_node_->HasRowBatchQueue());
-  DCHECK(batch_ != NULL);
-  DCHECK_GT(batch_->capacity(), batch_->num_rows());
-  *pool = batch_->tuple_data_pool();
-  *tuple_mem = reinterpret_cast<Tuple*>(tuple_mem_);
-  *tuple_row_mem = batch_->GetRow(batch_->AddRow());
-  return batch_->capacity() - batch_->num_rows();
-}
-
 Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool,
     Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) {
   int num_tuples;
@@ -210,10 +184,7 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool
   return Status::OK();
 }
 
-Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch) {
-  DCHECK(batch_ != NULL || !scan_node_->HasRowBatchQueue());
-  DCHECK(batch_ == row_batch || !scan_node_->HasRowBatchQueue());
-  DCHECK(!enqueue_if_full || scan_node_->HasRowBatchQueue());
+Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
   DCHECK_LE(num_rows, row_batch->capacity() - row_batch->num_rows());
   row_batch->CommitRows(num_rows);
   tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows;
@@ -224,10 +195,6 @@ Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row
   // if no rows passed predicates.
   if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) {
     context_->ReleaseCompletedResources(row_batch, /* done */ false);
-    if (enqueue_if_full) {
-      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
-      RETURN_IF_ERROR(StartNewRowBatch());
-    }
   }
   if (context_->cancelled()) return Status::CANCELLED;
   // Check for UDF errors.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index cb9dfeb..f2f3407 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -22,33 +22,27 @@
 #include <vector>
 #include <memory>
 #include <stdint.h>
-#include <boost/regex.hpp>
 #include <boost/scoped_ptr.hpp>
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
+#include "common/status.h"
 #include "exec/hdfs-scan-node-base.h"
-#include "exec/scan-node.h"
 #include "exec/scanner-context.h"
-#include "runtime/disk-io-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple.h"
 
 namespace impala {
 
+class Codec;
 class CollectionValueBuilder;
 class Compression;
-class DescriptorTbl;
 class Expr;
 class HdfsPartitionDescriptor;
 class MemPool;
-class SlotDescriptor;
-class Status;
 class TextConverter;
 class TupleDescriptor;
-class TPlanNode;
-class TScanRange;
-class Codec;
+class SlotDescriptor;
 
 /// Intermediate structure used for two pass parsing approach. In the first pass,
 /// the FieldLocation structs are filled out and contain where all the fields start and
@@ -118,7 +112,7 @@ class HdfsScanner {
   virtual ~HdfsScanner();
 
   /// One-time initialisation of state that is constant across scan ranges.
-  virtual Status Open(ScannerContext* context);
+  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
 
   /// Returns the next row batch from this scanner's split.
   /// Recoverable errors are logged to the runtime state. Only returns a non-OK status
@@ -127,7 +121,7 @@ class HdfsScanner {
   /// The memory referenced by the tuples is valid until this or any subsequently
   /// returned batch is reset or destroyed.
   /// Only valid to call if the parent scan node is single-threaded.
-  Status GetNext(RowBatch* row_batch) {
+  Status GetNext(RowBatch* row_batch) WARN_UNUSED_RESULT {
     DCHECK(!scan_node_->HasRowBatchQueue());
     return GetNextInternal(row_batch);
   }
@@ -136,14 +130,26 @@ class HdfsScanner {
   /// initialized with the split data (e.g. template tuple, partition descriptor, etc).
   /// This function should only return on error or end of scan range.
   /// Only valid to call if the parent scan node is multi-threaded.
-  virtual Status ProcessSplit() = 0;
+  virtual Status ProcessSplit() WARN_UNUSED_RESULT;
+
+  /// Creates a new row batch and transfers the ownership of memory backing returned
+  /// tuples to it by calling Close(RowBatch). That last batch is added to the row batch
+  /// queue. Only valid to call if HasRowBatchQueue().
+  void Close();
 
   /// Transfers the ownership of memory backing returned tuples such as IO buffers
   /// and memory in mem pools to the given row batch. If the row batch is NULL,
   /// those resources are released instead. In any case, releases all other resources
   /// that are not backing returned rows (e.g. temporary decompression buffers).
   /// This function is not idempotent and must only be called once.
-  virtual void Close(RowBatch* row_batch);
+  virtual void Close(RowBatch* row_batch) = 0;
+
+  /// Helper function that frees resources common to all scanner subclasses like the
+  /// 'decompressor_', 'context_', 'obj_pool_', etc. Should only be called once the last
+  /// row batch has been attached to the row batch queue (if applicable) to avoid freeing
+  /// memory that might be referenced by the last batch.
+  /// Only valid to call if 'is_closed_' is false. Sets 'is_closed_' to true.
+  void CloseInternal();
 
   /// Only valid to call if the parent scan node is single-threaded.
   bool eos() const {
@@ -151,12 +157,6 @@ class HdfsScanner {
     return eos_;
   }
 
-  /// Only valid to call if the parent scan node is multi-threaded.
-  RowBatch* batch() const {
-    DCHECK(scan_node_->HasRowBatchQueue());
-    return batch_;
-  }
-
   /// Scanner subclasses must implement these static functions as well.  Unfortunately,
   /// c++ does not allow static virtual functions.
 
@@ -173,7 +173,7 @@ class HdfsScanner {
   /// should be issued to the io mgr.  This is one range for the metadata and one
   /// range for each column, for each split.
   /// This function is how scanners can pick their strategy.
-  /// void IssueInitialRanges(HdfsScanNode* scan_node,
+  /// void IssueInitialRanges(HdfsScanNodeBase* scan_node,
   ///                         const std::vector<HdfsFileDesc*>& files);
 
   /// Codegen all functions for this scanner.  The codegen'd function is specific to
@@ -191,20 +191,19 @@ class HdfsScanner {
   RuntimeState* state_;
 
   /// Context for this scanner
-  ScannerContext* context_;
+  ScannerContext* context_ = nullptr;
 
   /// Object pool for objects with same lifetime as scanner.
   ObjectPool obj_pool_;
 
   /// The first stream for context_
-  ScannerContext::Stream* stream_;
+  ScannerContext::Stream* stream_ = nullptr;
 
   /// Set if this scanner has processed all ranges and will not produce more rows.
-  /// Only relevant when calling the GetNext() interface.
-  bool eos_;
+  bool eos_ = false;
 
   /// Starts as false and is set to true in Close().
-  bool is_closed_;
+  bool is_closed_ = false;
 
   /// MemPool used for expression evaluators in this scanner. Need to be local
   /// to each scanner as MemPool is not thread safe.
@@ -217,7 +216,7 @@ class HdfsScanner {
 
   // Convenience reference to conjuncts_evals_map_[scan_node_->tuple_idx()] for
   // scanners that do not support nested types.
-  const std::vector<ScalarExprEvaluator*>* conjunct_evals_;
+  const std::vector<ScalarExprEvaluator*>* conjunct_evals_ = nullptr;
 
   // Clones of the conjuncts' evaluators in scan_node_->dict_filter_conjuncts_map().
   typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> DictFilterConjunctsMap;
@@ -242,23 +241,16 @@ class HdfsScanner {
 
   /// Convenience variable set to the top-level template tuple
   /// (i.e. template_tuple_map_[scan_node_->tuple_desc()]).
-  Tuple* template_tuple_;
+  Tuple* template_tuple_ = nullptr;
 
   /// Fixed size of each top-level tuple, in bytes
   const int32_t tuple_byte_size_;
 
-  /// Current tuple pointer into tuple_mem_.
-  Tuple* tuple_;
+  /// Current tuple pointer into 'tuple_mem_'.
+  Tuple* tuple_ = nullptr;
 
-  /// The current row batch being populated. Creating new row batches, attaching context
-  /// resources, and handing off to the scan node is handled by this class in CommitRows(),
-  /// but AttachPool() must be called by scanner subclasses to attach any memory allocated
-  /// by that subclass. All row batches created by this class are transferred to the scan
-  /// node (i.e., all batches are ultimately owned by the scan node).
-  RowBatch* batch_;
-
-  /// The tuple memory of batch_.
-  uint8_t* tuple_mem_;
+  /// The tuple memory backing 'tuple_'.
+  uint8_t* tuple_mem_ = nullptr;
 
   /// Helper class for converting text to other types;
   boost::scoped_ptr<TextConverter> text_converter_;
@@ -267,31 +259,31 @@ class HdfsScanner {
   /// This significantly minimizes the cross compile dependencies for llvm since status
   /// objects inline a bunch of string functions.  Also, status objects aren't extremely
   /// cheap to create and destroy.
-  Status parse_status_;
+  Status parse_status_ = Status::OK();
 
   /// Decompressor class to use, if any.
   boost::scoped_ptr<Codec> decompressor_;
 
   /// The most recently used decompression type.
-  THdfsCompression::type decompression_type_;
+  THdfsCompression::type decompression_type_ = THdfsCompression::NONE;
 
   /// Pool to allocate per data block memory.  This should be used with the
   /// decompressor and any other per data block allocations.
   boost::scoped_ptr<MemPool> data_buffer_pool_;
 
   /// Time spent decompressing bytes.
-  RuntimeProfile::Counter* decompress_timer_;
+  RuntimeProfile::Counter* decompress_timer_ = nullptr;
 
   /// Matching typedef for WriteAlignedTuples for codegen.  Refer to comments for
   /// that function.
   typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, int, FieldLocation*,
       int, int, int, int);
   /// Jitted write tuples function pointer.  Null if codegen is disabled.
-  WriteTuplesFn write_tuples_fn_;
+  WriteTuplesFn write_tuples_fn_ = nullptr;
 
   /// Implements GetNext(). Should be overridden by subclasses.
   /// Only valid to call if the parent scan node is multi-threaded.
-  virtual Status GetNextInternal(RowBatch* row_batch) {
+  virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT {
     DCHECK(false) << "GetNextInternal() not implemented for this scanner type.";
     return Status::OK();
   }
@@ -301,25 +293,10 @@ class HdfsScanner {
   /// - type - type for this scanner
   /// - scanner_name - debug string name for this scanner (e.g. HdfsTextScanner)
   Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
-      THdfsFileFormat::type type, const std::string& scanner_name);
-
-  /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly.
-  /// Only valid to call if the parent scan node is multi-threaded.
-  Status StartNewRowBatch();
+      THdfsFileFormat::type type, const std::string& scanner_name) WARN_UNUSED_RESULT;
 
   /// Reset internal state for a new scan range.
-  virtual Status InitNewRange() = 0;
-
-  /// Gets memory for outputting tuples into batch_.
-  ///  *pool is the mem pool that should be used for memory allocated for those tuples.
-  ///  *tuple_mem should be the location to output tuples, and
-  ///  *tuple_row_mem for outputting tuple rows.
-  /// Returns the maximum number of tuples/tuple rows that can be output (before the
-  /// current row batch is complete and a new one is allocated).
-  /// Memory returned from this call is invalidated after calling CommitRows().
-  /// Callers must call GetMemory() again after calling this function.
-  /// Only valid to call if the parent scan node is multi-threaded.
-  int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem);
+  virtual Status InitNewRange() WARN_UNUSED_RESULT = 0;
 
   /// Gets memory for outputting tuples into the CollectionValue being constructed via
   /// 'builder'. If memory limit is exceeded, an error status is returned. Otherwise,
@@ -330,40 +307,13 @@ class HdfsScanner {
   /// the next tuple. This also means its unnecessary to call
   /// (*tuple_row_mem)->SetTuple().
   Status GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool,
-      Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows);
+      Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) WARN_UNUSED_RESULT;
 
   /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' accordingly.
   /// Attaches completed resources from 'context_' to 'row_batch' if necessary.
-  /// Frees local expr allocations.
-  /// If 'enqueue_if_full' is true and 'row_batch' is at capacity after committing the
-  /// rows, then 'row_batch' is added to the queue, and a new batch is created with
-  /// StartNewRowBatch(). It is only valid to pass true for 'enqueue_if_full' if the
-  /// parent parent scan node is multi-threaded.
-  /// Returns non-OK if 'context_' is cancelled or the query status in 'state_' is
-  /// non-OK.
-  Status CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch);
-
-  /// Calls the above CommitRows() passing true for 'queue_if_full', and 'batch_' as the
-  /// row batch. Only valid to call if the parent scan node is multi-threaded.
-  Status CommitRows(int num_rows) {
-    DCHECK(scan_node_->HasRowBatchQueue());
-    return CommitRows(num_rows, true, batch_);
-  }
-
-  /// Release all memory in 'pool' to batch_. If 'commit_batch' is true, the row batch
-  /// will be committed. 'commit_batch' should be true if the attached pool is expected
-  /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage.
-  /// Can return an error status if 'commit_batch' is true and allocating the next
-  /// batch fails, or if the query hit an error or is cancelled. Only valid to call if
-  /// the parent scan node is multi-threaded.
-  Status AttachPool(MemPool* pool, bool commit_batch) {
-    DCHECK(scan_node_->HasRowBatchQueue());
-    DCHECK(batch_ != NULL);
-    DCHECK(pool != NULL);
-    batch_->tuple_data_pool()->AcquireData(pool, false);
-    if (commit_batch) RETURN_IF_ERROR(CommitRows(0));
-    return Status::OK();
-  }
+  /// Frees local expr allocations. Returns non-OK if 'context_' is cancelled or the
+  /// query status in 'state_' is non-OK.
+  Status CommitRows(int num_rows, RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   /// Convenience function for evaluating conjuncts using this scanner's ScalarExprEvaluators.
   /// This must always be inlined so we can correctly replace the call to
@@ -395,8 +345,8 @@ class HdfsScanner {
   /// Update the decompressor_ object given a compression type or codec name. Depending on
   /// the old compression type and the new one, it may close the old decompressor and/or
   /// create a new one of different type.
-  Status UpdateDecompressor(const THdfsCompression::type& compression);
-  Status UpdateDecompressor(const std::string& codec);
+  Status UpdateDecompressor(const THdfsCompression::type& compression) WARN_UNUSED_RESULT;
+  Status UpdateDecompressor(const std::string& codec) WARN_UNUSED_RESULT;
 
   /// Utility function to report parse errors for each field.
   /// If errors[i] is nonzero, fields[i] had a parse error.
@@ -405,10 +355,10 @@ class HdfsScanner {
   /// This is called from WriteAlignedTuples.
   bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors);
 
-  /// Triggers debug action of the scan node. This is currently used by parquet column
-  /// readers to exercise various failure paths in parquet scanner. Returns the status
+  /// Triggers debug action of the scan node. This is currently used by Parquet column
+  /// readers to exercise various failure paths in Parquet scanner. Returns the status
   /// returned by the scan node's TriggerDebugAction().
-  Status ScannerDebugAction() {
+  Status ScannerDebugAction() WARN_UNUSED_RESULT {
     return scan_node_->ScanNodeDebugAction(TExecNodePhase::GETNEXT_SCANNER);
   }
 
@@ -435,21 +385,23 @@ class HdfsScanner {
   /// TODO: revisit this
   bool WriteCompleteTuple(MemPool* pool, FieldLocation* fields, Tuple* tuple,
       TupleRow* tuple_row, Tuple* template_tuple, uint8_t* error_fields,
-      uint8_t* error_in_row);
+      uint8_t* error_in_row) WARN_UNUSED_RESULT;
 
   /// Codegen function to replace WriteCompleteTuple. Should behave identically
   /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
   /// if codegen was successful or NULL otherwise.
   static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen,
       const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** write_complete_tuple_fn);
+      llvm::Function** write_complete_tuple_fn)
+      WARN_UNUSED_RESULT;
 
   /// Codegen function to replace WriteAlignedTuples.  WriteAlignedTuples is cross
   /// compiled to IR.  This function loads the precompiled IR function, modifies it,
   /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
   /// successful or NULL otherwise.
   static Status CodegenWriteAlignedTuples(HdfsScanNodeBase*, LlvmCodeGen*,
-      llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
+      llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn)
+      WARN_UNUSED_RESULT;
 
   /// Report parse error for column @ desc.   If abort_on_error is true, sets
   /// parse_status_ to the error message.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 355a554..9b66432 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -42,10 +42,7 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
 HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNodeBase* scan_node,
-    RuntimeState* state)
-    : BaseSequenceScanner(scan_node, state),
-      unparsed_data_buffer_(NULL),
-      num_buffered_records_in_compressed_block_(0) {
+    RuntimeState* state) : BaseSequenceScanner(scan_node, state) {
 }
 
 HdfsSequenceScanner::~HdfsSequenceScanner() {
@@ -54,22 +51,22 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 // Codegen for materialized parsed data into tuples.
 Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ScalarExpr*>& conjuncts, Function** write_aligned_tuples_fn) {
-  *write_aligned_tuples_fn = NULL;
+  *write_aligned_tuples_fn = nullptr;
   DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != NULL);
+  DCHECK(codegen != nullptr);
   Function* write_complete_tuple_fn;
   RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
       &write_complete_tuple_fn));
-  DCHECK(write_complete_tuple_fn != NULL);
+  DCHECK(write_complete_tuple_fn != nullptr);
   RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn,
       write_aligned_tuples_fn));
-  DCHECK(*write_aligned_tuples_fn != NULL);
+  DCHECK(*write_aligned_tuples_fn != nullptr);
   return Status::OK();
 }
 
 Status HdfsSequenceScanner::InitNewRange() {
-  DCHECK(header_ != NULL);
+  DCHECK(header_ != nullptr);
   only_parsing_header_ = false;
 
   HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor();
@@ -168,23 +165,34 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
 // 3. Read the sync indicator and check the sync block
 // This mimics the technique for text.
 // This function only returns on error or when the entire scan range is complete.
-Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() {
+Status HdfsSequenceScanner::ProcessBlockCompressedScanRange(RowBatch* row_batch) {
   DCHECK(header_->is_compressed);
 
-  while (!finished()) {
-    if (scan_node_->ReachedLimit()) return Status::OK();
-
+  if (num_buffered_records_in_compressed_block_ == 0) {
+    // We are reading a new compressed block. Pass the previous buffer pool bytes to the
+    // batch. We don't need them anymore.
+    if (!decompressor_->reuse_output_buffer()) {
+      row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
+      RETURN_IF_ERROR(CommitRows(0, row_batch));
+      if (row_batch->AtCapacity()) return Status::OK();
+    }
     // Step 1
     RETURN_IF_ERROR(ReadCompressedBlock());
     if (num_buffered_records_in_compressed_block_ < 0) return parse_status_;
+  }
 
-    // Step 2
-    while (num_buffered_records_in_compressed_block_ > 0) {
-      RETURN_IF_ERROR(ProcessDecompressedBlock());
-    }
+  // Step 2
+  while (num_buffered_records_in_compressed_block_ > 0) {
+    RETURN_IF_ERROR(ProcessDecompressedBlock(row_batch));
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+  }
 
-    // SequenceFiles don't end with syncs
-    if (stream_->eof()) return Status::OK();
+  if (num_buffered_records_in_compressed_block_ == 0) {
+    // SequenceFiles don't end with syncs.
+    if (stream_->eof()) {
+      eos_ = true;
+      return Status::OK();
+    }
 
     // Step 3
     int sync_indicator;
@@ -193,8 +201,8 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() {
       if (state_->LogHasSpace()) {
         stringstream ss;
         ss << "Expecting sync indicator (-1) at file offset "
-           << (stream_->file_offset() - sizeof(int)) << ".  "
-           << "Sync indicator found " << sync_indicator << ".";
+            << (stream_->file_offset() - sizeof(int)) << ".  "
+            << "Sync indicator found " << sync_indicator << ".";
         state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
       }
       return Status("Bad sync hash");
@@ -205,18 +213,17 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() {
   return Status::OK();
 }
 
-Status HdfsSequenceScanner::ProcessDecompressedBlock() {
-  MemPool* pool;
-  TupleRow* tuple_row;
-  int64_t max_tuples = GetMemory(&pool, &tuple_, &tuple_row);
+Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
+  int64_t max_tuples = row_batch->capacity() - row_batch->num_rows();
   int num_to_process = min(max_tuples, num_buffered_records_in_compressed_block_);
   num_buffered_records_in_compressed_block_ -= num_to_process;
 
+  TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow());
   if (scan_node_->materialized_slots().empty()) {
     // Handle case where there are no slots to materialize (e.g. count(*))
     num_to_process = WriteTemplateTuples(tuple_row, num_to_process);
     COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
-    RETURN_IF_ERROR(CommitRows(num_to_process));
+    RETURN_IF_ERROR(CommitRows(num_to_process, row_batch));
     return Status::OK();
   }
 
@@ -261,34 +268,32 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
   // Call jitted function if possible
   int tuples_returned;
-  if (write_tuples_fn_ != NULL) {
+  if (write_tuples_fn_ != nullptr) {
     // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string slots
     // and escape characters. TextConverter::WriteSlot() will be used instead.
     DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
         delimited_text_parser_->escape_char() == '\0');
     // last argument: seq always starts at record_location[0]
-    tuples_returned = write_tuples_fn_(this, pool, tuple_row,
-        batch_->row_byte_size(), field_locations_.data(), num_to_process,
+    tuples_returned = write_tuples_fn_(this, row_batch->tuple_data_pool(), tuple_row,
+        row_batch->row_byte_size(), field_locations_.data(), num_to_process,
         max_added_tuples, scan_node_->materialized_slots().size(), 0);
   } else {
-    tuples_returned = WriteAlignedTuples(pool, tuple_row,
-        batch_->row_byte_size(), field_locations_.data(), num_to_process,
+    tuples_returned = WriteAlignedTuples(row_batch->tuple_data_pool(), tuple_row,
+        row_batch->row_byte_size(), field_locations_.data(), num_to_process,
         max_added_tuples, scan_node_->materialized_slots().size(), 0);
   }
 
   if (tuples_returned == -1) return parse_status_;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
-  RETURN_IF_ERROR(CommitRows(tuples_returned));
+  RETURN_IF_ERROR(CommitRows(tuples_returned, row_batch));
   return Status::OK();
 }
 
-Status HdfsSequenceScanner::ProcessRange() {
-  num_buffered_records_in_compressed_block_ = 0;
-
+Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) {
   SeqFileHeader* seq_header = reinterpret_cast<SeqFileHeader*>(header_);
   // Block compressed is handled separately to minimize function calls.
   if (seq_header->is_compressed && !seq_header->is_row_compressed) {
-    return ProcessBlockCompressedScanRange();
+    return ProcessBlockCompressedScanRange(row_batch);
   }
 
   // We count the time here since there is too much overhead to do
@@ -296,40 +301,30 @@ Status HdfsSequenceScanner::ProcessRange() {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
   int64_t num_rows_read = 0;
 
-  while (!finished()) {
+  const bool has_materialized_slots = !scan_node_->materialized_slots().empty();
+  while (!eos_) {
     DCHECK_GT(record_locations_.size(), 0);
-    // Get the next compressed or uncompressed record.
-    RETURN_IF_ERROR(
-        GetRecord(&record_locations_[0].record, &record_locations_[0].len));
+    TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
 
-    MemPool* pool;
-    TupleRow* tuple_row_mem;
-    int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
-    DCHECK_GT(max_tuples, 0);
-
-    // Parse the current record.
+    // Get the next compressed or uncompressed record and parse it.
+    RETURN_IF_ERROR(GetRecord(&record_locations_[0].record, &record_locations_[0].len));
     bool add_row = false;
-
-    // Parse the current record.
-    if (scan_node_->materialized_slots().size() != 0) {
+    if (has_materialized_slots) {
       char* col_start;
       uint8_t* record_start = record_locations_[0].record;
       int num_tuples = 0;
       int num_fields = 0;
       char* row_end_loc;
-      uint8_t error_in_row = false;
-
       RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(
           1, record_locations_[0].len, reinterpret_cast<char**>(&record_start),
           &row_end_loc, field_locations_.data(), &num_tuples, &num_fields, &col_start));
-      DCHECK(num_tuples == 1);
+      DCHECK_EQ(num_tuples, 1);
 
+      uint8_t error_in_row = false;
       uint8_t errors[num_fields];
       memset(errors, 0, num_fields);
-
-      add_row = WriteCompleteTuple(pool, field_locations_.data(), tuple_, tuple_row_mem,
-          template_tuple_, &errors[0], &error_in_row);
-
+      add_row = WriteCompleteTuple(row_batch->tuple_data_pool(), field_locations_.data(),
+          tuple_, tuple_row_mem, template_tuple_, &errors[0], &error_in_row);
       if (UNLIKELY(error_in_row)) {
         ReportTupleParseError(field_locations_.data(), errors);
         RETURN_IF_ERROR(parse_status_);
@@ -338,11 +333,13 @@ Status HdfsSequenceScanner::ProcessRange() {
       add_row = WriteTemplateTuples(tuple_row_mem, 1);
     }
     num_rows_read++;
-    if (add_row) RETURN_IF_ERROR(CommitRows(1));
-    if (scan_node_->ReachedLimit()) break;
+    if (add_row) RETURN_IF_ERROR(CommitRows(1, row_batch));
 
-    // Sequence files don't end with syncs
-    if (stream_->eof())  break;
+    // Sequence files don't end with syncs.
+    if (stream_->eof()) {
+      eos_ = true;
+      break;
+    }
 
     // Check for sync by looking for the marker that precedes syncs.
     int marker;
@@ -351,6 +348,10 @@ Status HdfsSequenceScanner::ProcessRange() {
       RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ false));
       RETURN_IF_ERROR(ReadSync());
     }
+
+    // These checks must come after advancing past the next sync such that the stream is
+    // at the start of the next data block when this function is called again.
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
   }
 
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
@@ -454,19 +455,14 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
 }
 
 Status HdfsSequenceScanner::ReadCompressedBlock() {
-  // We are reading a new compressed block.  Pass the previous buffer pool
-  // bytes to the batch.  We don't need them anymore.
-  if (!decompressor_->reuse_output_buffer()) {
-    RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
-  }
-
+  int64_t num_buffered_records;
   RETURN_IF_FALSE(stream_->ReadVLong(
-      &num_buffered_records_in_compressed_block_, &parse_status_));
-  if (num_buffered_records_in_compressed_block_ < 0) {
+      &num_buffered_records, &parse_status_));
+  if (num_buffered_records < 0) {
     if (state_->LogHasSpace()) {
       stringstream ss;
       ss << "Bad compressed block record count: "
-         << num_buffered_records_in_compressed_block_;
+         << num_buffered_records;
       state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
     }
     return Status("bad record count");
@@ -490,7 +486,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
     return Status(ss.str());
   }
 
-  uint8_t* compressed_data = NULL;
+  uint8_t* compressed_data = nullptr;
   RETURN_IF_FALSE(stream_->ReadBytes(block_size, &compressed_data, &parse_status_));
 
   {
@@ -501,6 +497,6 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
     VLOG_FILE << "Decompressed " << block_size << " to " << len;
     next_record_in_compressed_block_ = unparsed_data_buffer_;
   }
-
+  num_buffered_records_in_compressed_block_ = num_buffered_records;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 01d5a68..4845edb 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -19,30 +19,29 @@
 #ifndef IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
 #define IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
 
-/// This scanner parses Sequence file located in HDFS, and writes the
-/// content as tuples in the Impala in-memory representation of data, e.g.
-/// (tuples, rows, row batches).
-//
+/// This scanner parses Sequence files and writes the content as tuples in the Impala
+/// in-memory representation of data (tuples, rows, row batches).
+///
 /// TODO: Make the various sequence file formats behave more similarly.  They should
 /// all have a structure similar to block compressed operating in batches rather than
 /// row at a time.
-//
+///
 /// org.apache.hadoop.io.SequenceFile is the original SequenceFile implementation
 /// and should be viewed as the canonical definition of this format. If
 /// anything is unclear in this file you should consult the code in
 /// org.apache.hadoop.io.SequenceFile.
-//
+///
 /// The following is a pseudo-BNF grammar for SequenceFile. Comments are prefixed
 /// with dashes:
-//
+///
 /// seqfile ::=
 ///   <file-header>
 ///   <record-block>+
-//
+///
 /// record-block ::=
 ///   <record>+
 ///   <file-sync-hash>
-//
+///
 /// file-header ::=
 ///   <file-version-header>
 ///   <file-key-class-name>
@@ -52,68 +51,68 @@
 ///   [<file-compression-codec-class>]
 ///   <file-header-metadata>
 ///   <file-sync-field>
-//
+///
 /// file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
-//
+///
 /// -- The name of the Java class responsible for reading the key buffer
-//
+///
 /// file-key-class-name ::=
 ///   Text {"org.apache.hadoop.io.BytesWritable"}
-//
+///
 /// -- The name of the Java class responsible for reading the value buffer
-//
+///
 /// -- We don't care what this is.
 /// file-value-class-name ::=
-//
+///
 /// -- Boolean variable indicating whether or not the file uses compression
 /// -- for key/values in this file
-//
+///
 /// file-is-compressed ::= Byte[1]
-//
+///
 /// -- A boolean field indicating whether or not the file is block compressed.
-//
+///
 /// file-is-block-compressed ::= Byte[1] {false}
-//
+///
 /// -- The Java class name of the compression codec iff <file-is-compressed>
 /// -- is true. The named class must implement
 /// -- org.apache.hadoop.io.compress.CompressionCodec.
 /// -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
-//
+///
 /// file-compression-codec-class ::= Text
-//
+///
 /// -- A collection of key-value pairs defining metadata values for the
 /// -- file. The Map is serialized using standard JDK serialization, i.e.
 /// -- an Int corresponding to the number of key-value pairs, followed by
 /// -- Text key and value pairs.
-//
+///
 /// file-header-metadata ::= Map<Text, Text>
-//
+///
 /// -- A 16 byte marker that is generated by the writer. This marker appears
 /// -- at regular intervals at the beginning of records or record blocks
 /// -- intended to enable readers to skip to a random part of the file
 /// -- the sync hash is preceeded by a length of -1, refered to as the sync marker
-//
+///
 /// file-sync-hash ::= Byte[16]
-//
+///
 /// -- Records are all of one type as determined by the compression bits in the header
-//
+///
 /// record ::=
 ///   <uncompressed-record>     |
 ///   <block-compressed-record> |
 ///   <record-compressed-record>
-//
+///
 /// uncompressed-record ::=
 ///   <record-length>
 ///   <key-length>
 ///   <key>
 ///   <value>
-//
+///
 /// record-compressed-record ::=
 ///   <record-length>
 ///   <key-length>
 ///   <key>
 ///   <compressed-value>
-//
+///
 /// block-compressed-record ::=
 ///   <file-sync-field>
 ///   <key-lengths-block-size>
@@ -124,30 +123,30 @@
 ///   <value-lengths-block>
 ///   <values-block-size>
 ///   <values-block>
-//
+///
 /// record-length := Int
 /// key-length := Int
 /// keys-lengths-block-size> := Int
 /// value-lengths-block-size> := Int
-//
+///
 /// keys-block :: = Byte[keys-block-size]
 /// values-block :: = Byte[values-block-size]
-//
+///
 /// -- The key-lengths and value-lengths blocks are are a sequence of lengths encoded
 /// -- in ZeroCompressedInteger (VInt) format.
-//
+///
 /// key-lengths-block :: = Byte[key-lengths-block-size]
 /// value-lengths-block :: = Byte[value-lengths-block-size]
-//
+///
 /// Byte ::= An eight-bit byte
-//
+///
 /// VInt ::= Variable length integer. The high-order bit of each byte
 /// indicates whether more bytes remain to be read. The low-order seven
 /// bits are appended as increasingly more significant bits in the
 /// resulting integer value.
-//
+///
 /// Int ::= A four-byte integer in big-endian format.
-//
+///
 /// Text ::= VInt, Chars (Length prefixed UTF-8 characters)
 
 #include "exec/base-sequence-scanner.h"
@@ -167,20 +166,21 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   virtual ~HdfsSequenceScanner();
 
   /// Implementation of HdfsScanner interface.
-  virtual Status Open(ScannerContext* context);
+  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
-  /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
+  /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
       const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** write_aligned_tuples_fn);
+      llvm::Function** write_aligned_tuples_fn)
+      WARN_UNUSED_RESULT;
 
  protected:
   /// Implementation of sequence container super class methods.
   virtual FileHeader* AllocateFileHeader();
-  virtual Status ReadFileHeader();
-  virtual Status InitNewRange();
-  virtual Status ProcessRange();
+  virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
+  virtual Status InitNewRange() WARN_UNUSED_RESULT;
+  virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   virtual THdfsFileFormat::type file_format() const {
     return THdfsFileFormat::SEQUENCE_FILE;
@@ -189,35 +189,37 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
  private:
   /// Maximum size of a compressed block.  This is used to check for corrupted
   /// block size so we do not read the whole file before we detect the error.
-  const static int MAX_BLOCK_SIZE = (1024 * 1024 * 1024);
+  static const int64_t MAX_BLOCK_SIZE = 1024 * 1024 * 1024;
 
   /// The value class name located in the SeqFile Header.
   /// This is always "org.apache.hadoop.io.Text"
   static const char* const SEQFILE_VALUE_CLASS_NAME;
 
-  /// Read the record header.
-  /// Sets:
-  ///   current_block_length_
-  Status ReadBlockHeader();
+  /// Reads the record header and sets 'current_block_length_'.
+  Status ReadBlockHeader() WARN_UNUSED_RESULT;
 
-  /// Process an entire block compressed scan range.  Block compressed ranges are
-  /// more common and can be parsed more efficiently in larger pieces.
-  Status ProcessBlockCompressedScanRange();
+  /// Processes or continues processing a block-compressed scan range, adding tuples
+  /// to 'row_batch'. Block-compressed ranges are common and can be parsed more
+  /// efficiently in larger pieces.
+  Status ProcessBlockCompressedScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
-  /// Read a compressed block. Does NOT read sync or -1 marker preceding sync.
-  /// Decompress to unparsed_data_buffer_ allocated from unparsed_data_buffer_pool_.
-  Status ReadCompressedBlock();
+  /// Reads a compressed block. Does NOT read sync or -1 marker preceding sync.
+  /// Decompresses the data into 'unparsed_data_buffer_' allocated from the
+  /// 'data_buffer_pool_' via the decompressor.
+  /// Sets 'num_buffered_records_in_compressed_block_' if decompression was
+  /// successful.
+  Status ReadCompressedBlock() WARN_UNUSED_RESULT;
 
-  /// Utility function for parsing next_record_in_compressed_block_. Called by
-  /// ProcessBlockCompressedScanRange.
-  Status ProcessDecompressedBlock();
+  /// Utility function for parsing 'next_record_in_compressed_block_'. Called by
+  /// ProcessBlockCompressedScanRange().
+  Status ProcessDecompressedBlock(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
-  /// Read compressed or uncompressed records from the byte stream into memory
-  /// in unparsed_data_buffer_pool_.  Not used for block compressed files.
+  /// Read a single record from the current position in 'stream_', decompressing
+  /// the record, if necessary. Not used for block compressed files.
   /// Output:
-  ///   record_ptr: ponter to the record.
+  ///   record_ptr: pointer to the record
   ///   record_len: length of the record
-  Status GetRecord(uint8_t** record_ptr, int64_t *record_len);
+  Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT;
 
   /// Helper class for picking fields and rows from delimited text.
   boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
@@ -235,25 +237,24 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
     int64_t len;
   };
 
-  /// Records are processed in batches.  This vector stores batches of record locations
+  /// Records are processed in batches. This vector stores batches of record locations
   /// that are being processed.
-  /// TODO: better perf not to use vector?
   std::vector<RecordLocation> record_locations_;
 
   /// Length of the current sequence file block (or record).
-  int current_block_length_;
+  int current_block_length_ = -1;
 
   /// Length of the current key.  This is specified as 4 bytes in the format description.
-  int current_key_length_;
+  int current_key_length_ = -1;
 
-  /// Buffer for data read from HDFS or from decompressing the HDFS data.
-  uint8_t* unparsed_data_buffer_;
+  /// Buffer for data read from the 'stream_' directly or after decompression.
+  uint8_t* unparsed_data_buffer_ = nullptr;
 
   /// Number of buffered records unparsed_data_buffer_ from block compressed data.
-  int64_t num_buffered_records_in_compressed_block_;
+  int64_t num_buffered_records_in_compressed_block_ = 0;
 
   /// Next record from block compressed data.
-  uint8_t* next_record_in_compressed_block_;
+  uint8_t* next_record_in_compressed_block_ = nullptr;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index c1a12d6..378b45e 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -17,6 +17,8 @@
 
 #include "exec/hdfs-text-scanner.h"
 
+#include <memory>
+
 #include "codegen/llvm-codegen.h"
 #include "exec/delimited-text-parser.h"
 #include "exec/delimited-text-parser.inline.h"
@@ -152,22 +154,6 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   return Status::OK();
 }
 
-Status HdfsTextScanner::ProcessSplit() {
-  DCHECK(scan_node_->HasRowBatchQueue());
-  HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
-  do {
-    batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
-        scan_node_->mem_tracker());
-    RETURN_IF_ERROR(GetNextInternal(batch_));
-    scan_node->AddMaterializedRowBatch(batch_);
-  } while (!eos_ && !scan_node_->ReachedLimit());
-
-  // Transfer the remaining resources to this new batch in Close().
-  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
-      scan_node_->mem_tracker());
-  return Status::OK();
-}
-
 void HdfsTextScanner::Close(RowBatch* row_batch) {
   DCHECK(!is_closed_);
   // Need to close the decompressor before transferring the remaining resources to
@@ -183,7 +169,8 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
     row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false);
     context_->ReleaseCompletedResources(row_batch, true);
     if (scan_node_->HasRowBatchQueue()) {
-      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+          unique_ptr<RowBatch>(row_batch));
     }
   } else {
     if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll();
@@ -201,7 +188,7 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
     scan_node_->RangeComplete(THdfsFileFormat::TEXT,
         stream_->file_desc()->file_compression);
   }
-  HdfsScanner::Close(row_batch);
+  CloseInternal();
 }
 
 Status HdfsTextScanner::InitNewRange() {
@@ -321,14 +308,14 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
         DCHECK_LE(num_tuples, 1);
         DCHECK_GE(num_tuples, 0);
         COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
-        RETURN_IF_ERROR(CommitRows(num_tuples, false, row_batch));
+        RETURN_IF_ERROR(CommitRows(num_tuples, row_batch));
       } else if (delimited_text_parser_->HasUnfinishedTuple()) {
         DCHECK(scan_node_->materialized_slots().empty());
         DCHECK_EQ(scan_node_->num_materialized_partition_keys(), 0);
         // If no fields are materialized we do not update partial_tuple_empty_,
         // boundary_column_, or boundary_row_. However, we still need to handle the case
         // of partial tuple due to missing tuple delimiter at the end of file.
-        RETURN_IF_ERROR(CommitRows(1, false, row_batch));
+        RETURN_IF_ERROR(CommitRows(1, row_batch));
       }
       break;
     }
@@ -417,7 +404,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) {
       }
       RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row));
     }
-    RETURN_IF_ERROR(CommitRows(num_tuples_materialized, false, row_batch));
+    RETURN_IF_ERROR(CommitRows(num_tuples_materialized, row_batch));
 
     // Already past the scan range and attempting to complete the last row.
     if (scan_state_ == PAST_SCAN_RANGE) break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index af8fa8a..ceeda8d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -51,19 +51,18 @@ class HdfsTextScanner : public HdfsScanner {
   virtual ~HdfsTextScanner();
 
   /// Implementation of HdfsScanner interface.
-  virtual Status Open(ScannerContext* context);
-  virtual Status ProcessSplit();
+  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
   virtual void Close(RowBatch* row_batch);
 
   /// Issue io manager byte ranges for 'files'.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
-      const std::vector<HdfsFileDesc*>& files);
+      const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT;
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
       const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** write_aligned_tuples_fn);
+      llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
 
   /// Suffix for lzo index files.
   const static std::string LZO_INDEX_SUFFIX;
@@ -71,11 +70,11 @@ class HdfsTextScanner : public HdfsScanner {
   static const char* LLVM_CLASS_NAME;
 
  protected:
-  virtual Status GetNextInternal(RowBatch* row_batch);
+  virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   /// Reset the scanner.  This clears any partial state that needs to
   /// be cleared when starting or when restarting after an error.
-  Status ResetScanner();
+  Status ResetScanner() WARN_UNUSED_RESULT;
 
   /// Current position in byte buffer.
   char* byte_buffer_ptr_;
@@ -103,14 +102,14 @@ class HdfsTextScanner : public HdfsScanner {
 
   /// Initializes this scanner for this context.  The context maps to a single
   /// scan range. Advances the scan state to SCAN_RANGE_INITIALIZED.
-  virtual Status InitNewRange();
+  virtual Status InitNewRange() WARN_UNUSED_RESULT;
 
   /// Finds the start of the first tuple in this scan range and initializes
   /// 'byte_buffer_ptr_' to point to the start of first tuple. Advances the scan state
   /// to FIRST_TUPLE_FOUND, if successful. Otherwise, consumes the whole scan range
   /// and does not update the scan state (e.g. if there are really large columns).
   /// Only valid to call in scan state SCAN_RANGE_INITIALIZED.
-  Status FindFirstTuple(MemPool* pool);
+  Status FindFirstTuple(MemPool* pool) WARN_UNUSED_RESULT;
 
   /// When in scan state FIRST_TUPLE_FOUND, starts or continues processing the scan range
   /// by reading bytes from 'context_'. Adds materialized tuples that pass the conjuncts
@@ -122,11 +121,11 @@ class HdfsTextScanner : public HdfsScanner {
   /// Advances the scan state to PAST_SCAN_RANGE if all bytes in the scan range have been
   /// processed.
   /// Only valid to call in scan state FIRST_TUPLE_FOUND or PAST_SCAN_RANGE.
-  Status ProcessRange(RowBatch* row_batch, int* num_tuples);
+  Status ProcessRange(RowBatch* row_batch, int* num_tuples) WARN_UNUSED_RESULT;
 
   /// Reads past the end of the scan range for the next tuple end. If successful,
   /// advances the scan state to DONE. Only valid to call in state PAST_SCAN_RANGE.
-  Status FinishScanRange(RowBatch* row_batch);
+  Status FinishScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   /// Fills the next byte buffer from the context.  This will block if there are no bytes
   /// ready.  Updates byte_buffer_ptr_, byte_buffer_end_ and byte_buffer_read_size_.
@@ -137,11 +136,12 @@ class HdfsTextScanner : public HdfsScanner {
   /// If applicable, attaches decompression buffers from previous calls that might still
   /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are
   /// freed instead.
-  virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0);
+  virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0)
+      WARN_UNUSED_RESULT;
 
   /// Fills the next byte buffer from the compressed data in stream_ by reading the entire
   /// file, decompressing it, and setting the byte_buffer_ptr_ to the decompressed buffer.
-  Status FillByteBufferCompressedFile(bool* eosr);
+  Status FillByteBufferCompressedFile(bool* eosr) WARN_UNUSED_RESULT;
 
   /// Fills the next byte buffer from the compressed data in stream_. Unlike
   /// FillByteBufferCompressedFile(), the entire file does not need to be read at once.
@@ -149,21 +149,21 @@ class HdfsTextScanner : public HdfsScanner {
   /// to available decompressed data.
   /// Attaches decompression buffers from previous calls that might still be referenced
   /// by returned batches to 'pool'. If 'pool' is nullptr the buffers are freed instead.
-  Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr);
+  Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr) WARN_UNUSED_RESULT;
 
   /// Used by FillByteBufferCompressedStream() to decompress data from 'stream_'.
   /// Returns COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS if it needs more input.
   /// If bytes_to_read > 0, will read specified size.
   /// If bytes_to_read = -1, will call GetBuffer().
   Status DecompressBufferStream(int64_t bytes_to_read, uint8_t** decompressed_buffer,
-      int64_t* decompressed_len, bool *eosr);
+      int64_t* decompressed_len, bool *eosr) WARN_UNUSED_RESULT;
 
   /// Checks if the current buffer ends with a row delimiter spanning this and the next
   /// buffer (i.e. a "\r\n" delimiter). Does not modify byte_buffer_ptr_, etc. Always
   /// returns false if the table's row delimiter is not '\n'. This can only be called
   /// after the buffer has been fully parsed, i.e. when byte_buffer_ptr_ ==
   /// byte_buffer_end_.
-  Status CheckForSplitDelimiter(bool* split_delimiter);
+  Status CheckForSplitDelimiter(bool* split_delimiter) WARN_UNUSED_RESULT;
 
   /// Prepends field data that was from the previous file buffer (This field straddled two
   /// file buffers). 'data' already contains the pointer/len from the current file buffer,
@@ -171,7 +171,7 @@ class HdfsTextScanner : public HdfsScanner {
   /// This function will allocate a new string from the tuple pool, concatenate the
   /// two pieces and update 'data' to contain the new pointer/len. Return error status if
   /// memory limit is exceeded when allocating a new string.
-  Status CopyBoundaryField(FieldLocation* data, MemPool* pool);
+  Status CopyBoundaryField(FieldLocation* data, MemPool* pool) WARN_UNUSED_RESULT;
 
   /// Writes intermediate parsed data into 'tuple_', evaluates conjuncts, and appends
   /// surviving rows to 'row'. Advances 'tuple_' and 'row' as necessary.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 08a3339..0845d9a 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -97,9 +97,9 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   }
 
   *eos = false;
-  RowBatch* materialized_batch = materialized_row_batches_->GetBatch();
+  unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
   if (materialized_batch != NULL) {
-    row_batch->AcquireState(materialized_batch);
+    row_batch->AcquireState(materialized_batch.get());
     num_rows_returned_ += row_batch->num_rows();
     COUNTER_SET(rows_returned_counter_, num_rows_returned_);
 
@@ -114,7 +114,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
       done_ = true;
       materialized_row_batches_->Shutdown();
     }
-    delete materialized_batch;
+    materialized_batch.reset();
   } else {
     *eos = true;
   }
@@ -172,15 +172,16 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_t
   RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token));
   bool eos = false;
   while (!eos && !done_) {
-    gscoped_ptr<RowBatch> row_batch(new RowBatch(
-        row_desc(), runtime_state_->batch_size(), mem_tracker()));
+    unique_ptr<RowBatch> row_batch = std::make_unique<RowBatch>(row_desc(),
+        runtime_state_->batch_size(), mem_tracker());
     RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos));
     while (!done_) {
       scanner->KeepKuduScannerAlive();
-      if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) {
-        ignore_result(row_batch.release());
+      if (materialized_row_batches_->BlockingPutWithTimeout(move(row_batch), 1000000)) {
         break;
       }
+      // Make sure that we still own the RowBatch if BlockingPutWithTimeout() timed out.
+      DCHECK(row_batch != nullptr);
     }
   }
   if (eos) scan_ranges_complete_counter()->Add(1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 69fd53d..87b9c71 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -85,7 +85,7 @@ class ScanNode : public ExecNode {
       active_scanner_thread_counter_(TUnit::UNIT, 0),
       active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
 
-  virtual Status Prepare(RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
 
   /// This should be called before Prepare(), and the argument must be not destroyed until
   /// after Prepare().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index 5d88397..a32345c 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -23,6 +23,7 @@
 #include <boost/thread/mutex.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <deque>
+#include <memory>
 #include <unistd.h>
 
 #include "common/atomic.h"
@@ -92,7 +93,7 @@ class BlockingQueue : public CacheLineAligned {
     }
 
     DCHECK(!get_list_.empty());
-    *out = get_list_.front();
+    *out = std::move(get_list_.front());
     get_list_.pop_front();
     get_list_size_.Store(get_list_.size());
     read_lock.unlock();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
index e396583..3f62b2e 100644
--- a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
+++ b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
@@ -10,8 +10,7 @@ string
 row_regex: .*Problem parsing file $NAMENODE/.*
 File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/truncated_string.avro' is corrupt: truncated data block at offset 155
 File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/negative_string_len.avro' is corrupt: invalid length -7 at offset 164
-File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid union value 4 at offset 174
-File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid encoded integer at offset 191
+File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid union value 4 at offset 174 (1 of 2 similar)
 ====
 ---- QUERY
 # Read from the corrupt files. We may get partial results.